Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/content/multiprocess/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,45 @@ from prometheus_client import Gauge
# Example gauge
IN_PROGRESS = Gauge("inprogress_requests", "help", multiprocess_mode='livesum')
```

**5. Customizing metric values**:

It's possible to customize the behavior of metric values by providing your own implementation of the `ValueClass`. This is useful if you want to add logging, custom synchronization, or change the data storage mechanism.

The `MmapedValue` and `MutexValue` classes are available in `prometheus_client.values` for this purpose. These are top-level classes, which makes it easy to inherit from them and override their methods.

To provide a custom `ValueClass`, set the `PROMETHEUS_VALUE_CLASS` environment variable to the full Python path of your class (e.g., `myapp.custom_values.MyValueClass`).

The class should inherit from `prometheus_client.values.MutexValue` (for single-process applications) or `prometheus_client.values.MmapedValue` (for multiprocess applications) to reuse the existing logic.

#### Example: Custom Mmaped Value

If you're using multiprocess mode and want to override the default increment behavior:

```python
# myapp/custom_values.py
from prometheus_client.values import MmapedValue

class MyMmapedValue(MmapedValue):
def inc(self, amount):
print(f"Incrementing metric by {amount}")
# Always call the superclass method to ensure the value is
# correctly stored and shared state is handled.
super().inc(amount)
```

Then, set the environment variable:

```bash
export PROMETHEUS_VALUE_CLASS=myapp.custom_values.MyMmapedValue
```

#### Behavior and Requirements:
- The environment variable must be set before any metric is instantiated. Therefore, preferrably, before python process start.
- The path must be a valid Python path to a class (including the class name).
- If the class cannot be imported, an `ImportError` will be raised during initialization.
- By default, `prometheus_client` uses `MmapedValue` if `PROMETHEUS_MULTIPROC_DIR` is set, and `MutexValue` otherwise.

**6. Advanced Customization with `MultiProcessValue`**:

For specialized use cases where you need a different process identifier than `os.getpid()`, you can use the `MultiProcessValue(process_identifier)` factory function. This returns a subclass of `MmapedValue` that uses the provided function to identify the process. Note that this cannot be set via the `PROMETHEUS_VALUE_CLASS` environment variable.
180 changes: 98 additions & 82 deletions prometheus_client/values.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import importlib
import os
from threading import Lock
import warnings
Expand Down Expand Up @@ -36,6 +37,82 @@ def get_exemplar(self):
return self._exemplar


class MmapedValue:
"""A float protected by a mutex backed by a per-process mmaped file."""

_multiprocess = True
_files = {}
_values = []
_pid = {'value': os.getpid()}
_lock = Lock()
_process_identifier = staticmethod(os.getpid)

def __init__(self, typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode='', **kwargs):
self._params = typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode
# This deprecation warning can go away in a few releases when removing the compatibility
if 'prometheus_multiproc_dir' in os.environ and 'PROMETHEUS_MULTIPROC_DIR' not in os.environ:
os.environ['PROMETHEUS_MULTIPROC_DIR'] = os.environ['prometheus_multiproc_dir']
warnings.warn("prometheus_multiproc_dir variable has been deprecated in favor of the upper case naming PROMETHEUS_MULTIPROC_DIR", DeprecationWarning)
with self._lock:
self.__check_for_pid_change()
self.__reset()
self._values.append(self)

def __reset(self):
typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode = self._params
if typ == 'gauge':
file_prefix = typ + '_' + multiprocess_mode
else:
file_prefix = typ
if file_prefix not in self._files:
filename = os.path.join(
os.environ.get('PROMETHEUS_MULTIPROC_DIR'),
'{}_{}.db'.format(file_prefix, self._pid['value']))

self._files[file_prefix] = MmapedDict(filename)
self._file = self._files[file_prefix]
self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
self._value, self._timestamp = self._file.read_value(self._key)

def __check_for_pid_change(self):
actual_pid = self._process_identifier()
if self._pid['value'] != actual_pid:
self._pid['value'] = actual_pid
# There has been a fork(), reset all the values.
for f in self._files.values():
f.close()
self._files.clear()
for value in self._values:
value.__reset()

def inc(self, amount):
with self._lock:
self.__check_for_pid_change()
self._value += amount
self._timestamp = 0.0
self._file.write_value(self._key, self._value, self._timestamp)

def set(self, value, timestamp=None):
with self._lock:
self.__check_for_pid_change()
self._value = value
self._timestamp = timestamp or 0.0
self._file.write_value(self._key, self._value, self._timestamp)

def set_exemplar(self, exemplar):
# TODO: Implement exemplars for multiprocess mode.
return

def get(self):
with self._lock:
self.__check_for_pid_change()
return self._value

def get_exemplar(self):
# TODO: Implement exemplars for multiprocess mode.
return None


def MultiProcessValue(process_identifier=os.getpid):
"""Returns a MmapedValue class based on a process_identifier function.

Expand All @@ -44,96 +121,35 @@ def MultiProcessValue(process_identifier=os.getpid):

Using a different function than the default 'os.getpid' is at your own risk.
"""
files = {}
values = []
pid = {'value': process_identifier()}
# Use a single global lock when in multi-processing mode
# as we presume this means there is no threading going on.
# This avoids the need to also have mutexes in __MmapDict.
lock = Lock()

class MmapedValue:
"""A float protected by a mutex backed by a per-process mmaped file."""

_multiprocess = True

def __init__(self, typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode='', **kwargs):
self._params = typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode
# This deprecation warning can go away in a few releases when removing the compatibility
if 'prometheus_multiproc_dir' in os.environ and 'PROMETHEUS_MULTIPROC_DIR' not in os.environ:
os.environ['PROMETHEUS_MULTIPROC_DIR'] = os.environ['prometheus_multiproc_dir']
warnings.warn("prometheus_multiproc_dir variable has been deprecated in favor of the upper case naming PROMETHEUS_MULTIPROC_DIR", DeprecationWarning)
with lock:
self.__check_for_pid_change()
self.__reset()
values.append(self)

def __reset(self):
typ, metric_name, name, labelnames, labelvalues, help_text, multiprocess_mode = self._params
if typ == 'gauge':
file_prefix = typ + '_' + multiprocess_mode
else:
file_prefix = typ
if file_prefix not in files:
filename = os.path.join(
os.environ.get('PROMETHEUS_MULTIPROC_DIR'),
'{}_{}.db'.format(file_prefix, pid['value']))

files[file_prefix] = MmapedDict(filename)
self._file = files[file_prefix]
self._key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
self._value, self._timestamp = self._file.read_value(self._key)

def __check_for_pid_change(self):
actual_pid = process_identifier()
if pid['value'] != actual_pid:
pid['value'] = actual_pid
# There has been a fork(), reset all the values.
for f in files.values():
f.close()
files.clear()
for value in values:
value.__reset()

def inc(self, amount):
with lock:
self.__check_for_pid_change()
self._value += amount
self._timestamp = 0.0
self._file.write_value(self._key, self._value, self._timestamp)

def set(self, value, timestamp=None):
with lock:
self.__check_for_pid_change()
self._value = value
self._timestamp = timestamp or 0.0
self._file.write_value(self._key, self._value, self._timestamp)

def set_exemplar(self, exemplar):
# TODO: Implement exemplars for multiprocess mode.
return

def get(self):
with lock:
self.__check_for_pid_change()
return self._value

def get_exemplar(self):
# TODO: Implement exemplars for multiprocess mode.
return None

return MmapedValue
class _MmapedValue(MmapedValue):
_files = {}
_values = []
_pid = {'value': process_identifier()}
_lock = Lock()
_process_identifier = staticmethod(process_identifier)

return _MmapedValue


def get_value_class():
# Should we enable multi-process mode?
# This needs to be chosen before the first metric is constructed,
# and as that may be in some arbitrary library the user/admin has
# no control over we use an environment variable.
value_class_path = os.environ.get('PROMETHEUS_VALUE_CLASS')
if value_class_path:
if '.' not in value_class_path:
raise ImportError(f"PROMETHEUS_VALUE_CLASS must be a full python path (e.g. module.ClassName), got '{value_class_path}'")
try:
module_path, class_name = value_class_path.rsplit('.', 1)
module = importlib.import_module(module_path)
return getattr(module, class_name)
except (ImportError, AttributeError) as e:
raise ImportError(f"Could not import PROMETHEUS_VALUE_CLASS '{value_class_path}': {e}") from None
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise ImportError(f"Could not import PROMETHEUS_VALUE_CLASS '{value_class_path}': {e}") from None
raise ImportError(f"Could not import PROMETHEUS_VALUE_CLASS '{value_class_path}'") from e


if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
return MultiProcessValue()
else:
return MutexValue
return MmapedValue
return MutexValue


ValueClass = get_value_class()
68 changes: 68 additions & 0 deletions tests/test_multiprocess.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import glob
import importlib
import os
import shutil
import tempfile
Expand Down Expand Up @@ -646,3 +647,70 @@ def test_file_syncpath(self):

def tearDown(self):
os.remove(self.tmpfl)


class TestCustomValueClass(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
os.environ['PROMETHEUS_MULTIPROC_DIR'] = self.tmpdir

def tearDown(self):
shutil.rmtree(self.tmpdir)
# Restore default ValueClass
values.ValueClass = values.get_value_class()

def test_custom_value_class(self):
class MyCustomValue(values.MmapedValue):
def inc(self, amount):
super().inc(amount * 2) # Double the increment

values.ValueClass = MyCustomValue

c = Counter('my_counter', 'help')
c.inc(5)

self.assertEqual(c._value.get(), 10)
self.assertIsInstance(c._value, MyCustomValue)


class TestValueClassEnv(unittest.TestCase):
def setUp(self):
self.original_env = os.environ.get('PROMETHEUS_VALUE_CLASS')
if 'PROMETHEUS_VALUE_CLASS' in os.environ:
del os.environ['PROMETHEUS_VALUE_CLASS']

def tearDown(self):
if self.original_env:
os.environ['PROMETHEUS_VALUE_CLASS'] = self.original_env
elif 'PROMETHEUS_VALUE_CLASS' in os.environ:
del os.environ['PROMETHEUS_VALUE_CLASS']
# Reset ValueClass to default for other tests
importlib.reload(values)

def test_default_value_class(self):
importlib.reload(values)
self.assertEqual(values.ValueClass, values.MutexValue)

def test_multiproc_value_class(self):
os.environ['PROMETHEUS_MULTIPROC_DIR'] = '/tmp'
importlib.reload(values)
self.assertEqual(values.ValueClass, values.MmapedValue)
del os.environ['PROMETHEUS_MULTIPROC_DIR']

def test_env_var_value_class(self):
# We need a class to point to. Let's use MutexValue itself but via string
os.environ['PROMETHEUS_VALUE_CLASS'] = 'prometheus_client.values.MutexValue'
importlib.reload(values)
self.assertEqual(values.ValueClass, values.MutexValue)
Comment on lines +700 to +704
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't test anything because MutexValue is the default. Better point at something non-default, maybe to values.MmapedValue or even some random class from stdlib (we won't use it anyway, just need to check assignment)


def test_invalid_path_fails_loudly(self):
os.environ['PROMETHEUS_VALUE_CLASS'] = 'invalid.path.Class'
with self.assertRaises(ImportError) as cm:
importlib.reload(values)
self.assertIn("Could not import PROMETHEUS_VALUE_CLASS", str(cm.exception))

def test_no_dot_fails_loudly(self):
os.environ['PROMETHEUS_VALUE_CLASS'] = 'NoDot'
with self.assertRaises(ImportError) as cm:
importlib.reload(values)
self.assertIn("PROMETHEUS_VALUE_CLASS must be a full python path", str(cm.exception))