Skip to content

Commit

Permalink
Indicate which package a schema comes from when missing
Browse files Browse the repository at this point in the history
Fixes: #187
Signed-off-by: Aurélien Bompard <[email protected]>
  • Loading branch information
abompard committed Jul 8, 2024
1 parent 0052817 commit 23ff5d1
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 24 deletions.
63 changes: 55 additions & 8 deletions fedora_messaging/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import datetime
import json
import logging
import re
import uuid
from importlib.metadata import entry_points
from importlib.metadata import distribution as get_distribution
from importlib.metadata import entry_points, PackageNotFoundError

import jsonschema
import pika
Expand Down Expand Up @@ -65,6 +67,7 @@
# Maps string names of message types to classes and back
_schema_name_to_class = {}
_class_to_schema_name = {}
_schema_name_to_package = {}

# Used to load the registry automatically on first use
_registry_loaded = False
Expand All @@ -83,18 +86,32 @@ def get_class(schema_name):
Returns:
Message: A sub-class of :class:`Message` to create the message from.
"""

return _get_class_from_headers(dict(fedora_messaging_schema=schema_name))


def _get_class_from_headers(headers):
global _registry_loaded
if not _registry_loaded:
load_message_classes()

schema_name = headers["fedora_messaging_schema"]
try:
return _schema_name_to_class[schema_name]
except KeyError:
schema_package = headers.get("fedora_messaging_schema_package")
if schema_package:
package_text = f"You can install the missing schema from package {schema_package!r}"
else:
package_text = (
"Either install the package with its schema definition or define a schema"
)

_log.warning(
'The schema "%s" is not in the schema registry! Either install '
"the package with its schema definition or define a schema. "
'The schema "%s" is not in the schema registry! %s. '
"Falling back to the default schema...",
schema_name,
package_text,
)
return Message

Expand Down Expand Up @@ -124,6 +141,24 @@ def get_name(cls):
) from e


def _get_distribution_from_module(module):
if not module:
return None
module_parts = module.split(".")
while module_parts:
try:
distribution = get_distribution(".".join(module_parts))
try:
distribution_name = distribution.name
except AttributeError: # pragma: no cover
# COMPAT: Python <= 3.9
distribution_name = distribution.metadata["Name"]
except PackageNotFoundError:
return _get_distribution_from_module(".".join(module_parts[:-1]))
# Normalize the name: PEP 503 plus dashes as underscores.
return re.sub(r"[-_.]+", "-", distribution_name).lower().replace("-", "_")


def load_message_classes():
"""Load the 'fedora.messages' entry points and register the message classes."""
try:
Expand All @@ -141,6 +176,12 @@ def load_message_classes():
)
_schema_name_to_class[message.name] = cls
_class_to_schema_name[cls] = message.name
try:
module = message.module
except AttributeError: # pragma: no cover
# COMPAT: Python <= 3.8
module = message.pattern.match(message.value).group("module")
_schema_name_to_package[message.name] = _get_distribution_from_module(module)
global _registry_loaded
_registry_loaded = True

Expand All @@ -167,7 +208,7 @@ def get_message(routing_key, properties, body):
properties.headers = {}

try:
MessageClass = get_class(properties.headers["fedora_messaging_schema"])
MessageClass = _get_class_from_headers(properties.headers)
except KeyError:
_log.error(
"Message (headers=%r, body=%r) arrived without a schema header."
Expand Down Expand Up @@ -320,6 +361,7 @@ class attribute, although this is a convenient approach. Users are
"enum": [DEBUG, INFO, WARNING, ERROR],
},
"fedora_messaging_schema": {"type": "string"},
"fedora_messaging_schema_package": {"type": "string"},
"sent-at": {"type": "string"},
},
}
Expand Down Expand Up @@ -347,7 +389,10 @@ def _build_properties(self, headers):
# Consumers use this to determine what schema to use and if they're out
# of date.
headers = headers.copy()
headers["fedora_messaging_schema"] = get_name(self.__class__)
headers["fedora_messaging_schema"] = schema_name = get_name(self.__class__)
schema_package = _schema_name_to_package.get(schema_name)
if schema_package:
headers["fedora_messaging_schema_package"] = schema_package
now = datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0)
headers["sent-at"] = now.isoformat()
headers["fedora_messaging_severity"] = self.severity
Expand Down Expand Up @@ -723,9 +768,11 @@ def load_message(message_dict):
jsonschema.validate(message_dict, SERIALIZED_MESSAGE_SCHEMA)
except jsonschema.exceptions.ValidationError as e:
raise ValidationError(e) from e
MessageClass = get_class(
message_dict.get("headers", {}).get("fedora_messaging_schema", "base.message")
)
try:
MessageClass = _get_class_from_headers(message_dict.get("headers", {}))
except KeyError:
# No "fedora_messaging_schema" header
MessageClass = Message
message = MessageClass(
body=message_dict["body"],
topic=message_dict["topic"],
Expand Down
1 change: 1 addition & 0 deletions news/187.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Indicate which package a schema comes from when missing
1 change: 1 addition & 0 deletions tests/integration/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def test_twisted_consume_halt_consumer(queue_and_binding):
expected_headers = {
"fedora_messaging_severity": 20,
"fedora_messaging_schema": "base.message",
"fedora_messaging_schema_package": "fedora_messaging",
"priority": 0,
"niceness": "very",
}
Expand Down
14 changes: 9 additions & 5 deletions tests/unit/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,10 @@ def test_save_recorded_messages_when_limit_is_reached(self):
test_recorder = cli.Recorder(2, mock_file)
test_recorder.collect_message(msg1)
mock_file.write.assert_called_with(
'{"body": {"test_key1": "test_value1"}, "headers"'
': {"fedora_messaging_schema": "base.message", "fedora_messaging_severity": 20, '
'{"body": {"test_key1": "test_value1"}, "headers": {'
'"fedora_messaging_schema": "base.message", '
'"fedora_messaging_schema_package": "fedora_messaging", '
'"fedora_messaging_severity": 20, '
'"priority": 0, "sent-at": "2018-11-18T10:11:41+00:00"}, '
'"id": "273ed91d-b8b5-487a-9576-95b9fbdf3eec", '
'"priority": 0, "queue": null, "topic": "test_topic1"}\n'
Expand All @@ -716,9 +718,11 @@ def test_save_recorded_messages_when_limit_is_reached(self):
assert the_exception.exit_code == 0
assert test_recorder.counter == 2
mock_file.write.assert_called_with(
'{"body": {"test_key2": "test_value2"}, "headers": '
'{"fedora_messaging_schema": "base.message", "fedora_messaging_severity": '
'20, "priority": 0, "sent-at": "2018-11-18T10:11:41+00:00"}, "id": '
'{"body": {"test_key2": "test_value2"}, "headers": {'
'"fedora_messaging_schema": "base.message", '
'"fedora_messaging_schema_package": "fedora_messaging", '
'"fedora_messaging_severity": 20, '
'"priority": 0, "sent-at": "2018-11-18T10:11:41+00:00"}, "id": '
'"273ed91d-b8b5-487a-9576-95b9fbdf3eec", "priority": 0, "queue": null, '
'"topic": "test_topic2"}\n'
)
Expand Down
89 changes: 78 additions & 11 deletions tests/unit/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,23 @@ def test_missing_headers(self):
)
assert isinstance(received_msg, message.Message)

def test_missing_schema(self, caplog):
"""Assert a missing schema package gives an informative log."""
msg = message.Message()
msg._headers = {
"fedora_messaging_schema": "dummy",
"fedora_messaging_schema_package": "dummy-package",
"fedora_messaging_severity": message.INFO,
}
received_msg = message.get_message(
msg._encoded_routing_key, msg._properties, msg._encoded_body
)
assert isinstance(received_msg, message.Message)
assert caplog.messages == [
'The schema "dummy" is not in the schema registry! You can install the missing schema '
"from package 'dummy-package'. Falling back to the default schema..."
]

@mock.patch.dict(message._class_to_schema_name, {DeprecatedMessage: "deprecated_message_id"})
@mock.patch.dict(message._schema_name_to_class, {"deprecated_message_id": DeprecatedMessage})
def test_deprecated(self, caplog):
Expand All @@ -86,6 +103,7 @@ def test_proper_message(self):
test_id = "test id"
test_headers = {
"fedora_messaging_schema": "base.message",
"fedora_messaging_schema_package": "fedora_messaging",
"fedora_messaging_severity": message.WARNING,
}
test_properties = pika.BasicProperties(
Expand All @@ -100,9 +118,11 @@ def test_proper_message(self):

test_msg.queue = test_queue
expected_json = (
'{"body": {"test_key": "test_value"}, "headers": {"fedora_messaging_schema": '
'"base.message", "fedora_messaging_severity": 30}, "id": "test id", '
'"priority": 2, "queue": "test queue", "topic": "test topic"}\n'
'{"body": {"test_key": "test_value"}, "headers": {'
'"fedora_messaging_schema": "base.message", '
'"fedora_messaging_schema_package": "fedora_messaging", '
'"fedora_messaging_severity": 30'
'}, "id": "test id", "priority": 2, "queue": "test queue", "topic": "test topic"}\n'
)
assert expected_json == message.dumps(test_msg)

Expand All @@ -114,6 +134,7 @@ def test_proper_message_multiple(self):
test_id = "test id"
test_headers = {
"fedora_messaging_schema": "base.message",
"fedora_messaging_schema_package": "fedora_messaging",
"fedora_messaging_severity": message.WARNING,
}
test_properties = pika.BasicProperties(
Expand All @@ -128,11 +149,17 @@ def test_proper_message_multiple(self):
test_msg.queue = test_queue
test_msg2.queue = test_queue
expected_json = (
'{"body": {"test_key": "test_value"}, "headers": {"fedora_messaging_schema": '
'"base.message", "fedora_messaging_severity": 30}, "id": "test id", '
'{"body": {"test_key": "test_value"}, "headers": {'
'"fedora_messaging_schema": "base.message", '
'"fedora_messaging_schema_package": "fedora_messaging", '
'"fedora_messaging_severity": 30'
'}, "id": "test id", '
'"priority": 0, "queue": "test queue", "topic": "test topic"}\n'
'{"body": {"test_key": "test_value"}, "headers": {"fedora_messaging_schema": '
'"base.message", "fedora_messaging_severity": 30}, "id": "test id", '
'{"body": {"test_key": "test_value"}, "headers": {'
'"fedora_messaging_schema": "base.message", '
'"fedora_messaging_schema_package": "fedora_messaging", '
'"fedora_messaging_severity": 30'
'}, "id": "test id", '
'"priority": 0, "queue": "test queue", "topic": "test topic"}\n'
)

Expand All @@ -152,8 +179,11 @@ class TestMessageLoads:
def test_proper_json(self):
"""Assert loading single message from json work."""
message_json = (
'{"topic": "test topic", "headers": {"fedora_messaging_schema": "base.message", '
'"fedora_messaging_severity": 30}, "id": "test id", "body": '
'{"topic": "test topic", "headers": {'
'"fedora_messaging_schema": "base.message", '
'"fedora_messaging_schema_package": "fedora_messaging", '
'"fedora_messaging_severity": 30'
'}, "id": "test id", "body": '
'{"test_key": "test_value"}, "priority": 2, "queue": "test queue"}\n'
)
messages = message.loads(message_json)
Expand All @@ -167,6 +197,7 @@ def test_proper_json(self):
assert 2 == test_message.priority
assert message.WARNING == test_message._headers["fedora_messaging_severity"]
assert "base.message" == test_message._headers["fedora_messaging_schema"]
assert "fedora_messaging" == test_message._headers["fedora_messaging_schema_package"]

def test_improper_json(self):
"""Assert proper exception is raised when improper json is provided."""
Expand All @@ -184,10 +215,11 @@ def test_missing_headers(self):
}
test_message = message.load_message(message_dict)
assert test_message._headers["fedora_messaging_schema"] == "base.message"
assert test_message._headers["fedora_messaging_schema_package"] == "fedora_messaging"
assert test_message._headers["fedora_messaging_severity"] == message.INFO
assert "sent-at" in test_message._headers

def test_missing_messaging_schema(self):
def test_missing_messaging_schema_header(self, caplog):
"""Assert the default schema is used when messaging schema is missing."""
message_dict = {
"id": "test id",
Expand All @@ -198,6 +230,27 @@ def test_missing_messaging_schema(self):
}
test_message = message.load_message(message_dict)
assert isinstance(test_message, message.Message)
assert caplog.messages == []

def test_missing_messaging_schema(self, caplog):
"""Assert a helpful message is logged when the schema is missing."""
message_dict = {
"id": "test id",
"topic": "test topic",
"headers": {
"fedora_messaging_schema": "dummy",
"fedora_messaging_schema_package": "dummy-package",
"fedora_messaging_severity": 30,
},
"body": {"test_key": "test_value"},
"queue": "test queue",
}
test_message = message.load_message(message_dict)
assert isinstance(test_message, message.Message)
assert caplog.messages == [
'The schema "dummy" is not in the schema registry! You can install the missing schema '
"from package 'dummy-package'. Falling back to the default schema..."
]

def test_missing_body(self):
"""Assert proper exception is raised when body is missing."""
Expand Down Expand Up @@ -366,13 +419,16 @@ def test_properties_default(self):
assert "sent-at" in msg._properties.headers
assert "fedora_messaging_schema" in msg._properties.headers
assert msg._properties.headers["fedora_messaging_schema"] == "base.message"
assert "fedora_messaging_schema_package" in msg._properties.headers
assert msg._properties.headers["fedora_messaging_schema_package"] == "fedora_messaging"

def test_headers(self):
msg = message.Message(headers={"foo": "bar"})
assert "foo" in msg._properties.headers
assert msg._properties.headers["foo"] == "bar"
# The fedora_messaging_schema key must also be added when headers are given.
# The fedora_messaging_schema keys must also be added when headers are given.
assert msg._properties.headers["fedora_messaging_schema"] == "base.message"
assert msg._properties.headers["fedora_messaging_schema_package"] == "fedora_messaging"

def test_severity_default_header_set(self):
"""Assert the default severity is placed in the header if unspecified."""
Expand Down Expand Up @@ -526,9 +582,16 @@ def flatpaks(self):


@mock.patch.dict(message._class_to_schema_name, {CustomMessage: "custom_id"})
@mock.patch.dict(message._schema_name_to_package, {"custom_id": "custom-package"})
class TestCustomMessage:
"""Tests for a Message subclass that provides filter headers"""

def test_schema_headers(self):
"""Assert schema name and package are placed in the message headers."""
msg = CustomMessage(body={})
assert msg._headers.get("fedora_messaging_schema") == "custom_id"
assert msg._headers.get("fedora_messaging_schema_package") == "custom-package"

def test_usernames(self):
"""Assert usernames are placed in the message headers."""
msg = CustomMessage(body={"users": ["jcline", "abompard"]})
Expand Down Expand Up @@ -656,3 +719,7 @@ def test_get_name_autoload_once(self):
with mock.patch.dict(message._class_to_schema_name, {}, clear=True):
with pytest.raises(TypeError):
message.get_name("this.is.not.an.entrypoint")

def test_get_distribution_from_module(self):
"""Assert getting the distribution from a non-existing module does not crash."""
assert message._get_distribution_from_module("does.not.exist") is None

0 comments on commit 23ff5d1

Please sign in to comment.