Skip to content

Commit 675c44b

Browse files
authored
feat(v2): full implementation for kafka bindings (#253)
* feat: full implementation of kafka bindings Signed-off-by: Tudor Plugaru <[email protected]> * chore: no need to re-export functions from core module. let users explicitly import Signed-off-by: Tudor Plugaru <[email protected]> --------- Signed-off-by: Tudor Plugaru <[email protected]>
1 parent db71318 commit 675c44b

File tree

5 files changed

+1027
-65
lines changed

5 files changed

+1027
-65
lines changed

src/cloudevents/core/bindings/__init__.py

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,9 @@
1212
# License for the specific language governing permissions and limitations
1313
# under the License.
1414

15-
from cloudevents.core.bindings.http import (
16-
HTTPMessage,
17-
from_binary,
18-
from_http,
19-
from_structured,
20-
to_binary,
21-
to_structured,
22-
)
15+
"""
16+
CloudEvents protocol bindings.
2317
24-
__all__ = [
25-
"HTTPMessage",
26-
"to_binary",
27-
"from_binary",
28-
"to_structured",
29-
"from_structured",
30-
"from_http",
31-
]
18+
This package provides protocol-specific bindings for CloudEvents, including HTTP and Kafka.
19+
Each binding module provides functions to convert CloudEvents to/from protocol-specific messages.
20+
"""
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Copyright 2018-Present The CloudEvents Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
15+
"""
16+
Common utilities for CloudEvents protocol bindings.
17+
18+
This module provides shared functionality for protocol bindings (HTTP, Kafka, etc.)
19+
to handle CloudEvent attribute encoding and decoding per the CloudEvents specification.
20+
"""
21+
22+
from datetime import datetime
23+
from typing import Any, Final
24+
from urllib.parse import quote, unquote
25+
26+
from dateutil.parser import isoparse
27+
28+
TIME_ATTR: Final[str] = "time"
29+
CONTENT_TYPE_HEADER: Final[str] = "content-type"
30+
DATACONTENTTYPE_ATTR: Final[str] = "datacontenttype"
31+
32+
33+
def encode_header_value(value: Any) -> str:
34+
"""
35+
Encode a CloudEvent attribute value for use in a protocol header.
36+
37+
Handles special encoding for datetime objects (ISO 8601 with 'Z' suffix for UTC)
38+
and applies percent-encoding for non-ASCII and special characters per RFC 3986.
39+
40+
:param value: The attribute value to encode
41+
:return: Percent-encoded string suitable for protocol headers
42+
"""
43+
if isinstance(value, datetime):
44+
str_value = value.isoformat()
45+
if str_value.endswith("+00:00"):
46+
str_value = str_value[:-6] + "Z"
47+
return quote(str_value, safe="")
48+
49+
return quote(str(value), safe="")
50+
51+
52+
def decode_header_value(attr_name: str, value: str) -> Any:
53+
"""
54+
Decode a CloudEvent attribute value from a protocol header.
55+
56+
Applies percent-decoding and special parsing for the 'time' attribute
57+
(converts to datetime object using RFC 3339 parsing).
58+
59+
:param attr_name: The name of the CloudEvent attribute
60+
:param value: The percent-encoded header value
61+
:return: Decoded value (datetime for 'time' attribute, string otherwise)
62+
"""
63+
decoded = unquote(value)
64+
65+
if attr_name == TIME_ATTR:
66+
return isoparse(decoded)
67+
68+
return decoded

src/cloudevents/core/bindings/http.py

Lines changed: 12 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,18 @@
1313
# under the License.
1414

1515
from dataclasses import dataclass
16-
from datetime import datetime
1716
from typing import Any, Callable, Final
18-
from urllib.parse import quote, unquote
19-
20-
from dateutil.parser import isoparse
2117

2218
from cloudevents.core.base import BaseCloudEvent
19+
from cloudevents.core.bindings.common import (
20+
CONTENT_TYPE_HEADER,
21+
DATACONTENTTYPE_ATTR,
22+
decode_header_value,
23+
encode_header_value,
24+
)
2325
from cloudevents.core.formats.base import Format
2426

2527
CE_PREFIX: Final[str] = "ce-"
26-
CONTENT_TYPE_HEADER: Final[str] = "content-type"
2728

2829

2930
@dataclass(frozen=True)
@@ -44,44 +45,6 @@ class HTTPMessage:
4445
body: bytes
4546

4647

47-
def _encode_header_value(value: Any) -> str:
48-
"""
49-
Encode a CloudEvent attribute value for use in an HTTP header.
50-
51-
Handles special encoding for datetime objects (ISO 8601 with 'Z' suffix for UTC)
52-
and applies percent-encoding for non-ASCII and special characters per RFC 3986.
53-
54-
:param value: The attribute value to encode
55-
:return: Percent-encoded string suitable for HTTP headers
56-
"""
57-
if isinstance(value, datetime):
58-
str_value = value.isoformat()
59-
if str_value.endswith("+00:00"):
60-
str_value = str_value[:-6] + "Z"
61-
return quote(str_value, safe="")
62-
63-
return quote(str(value), safe="")
64-
65-
66-
def _decode_header_value(attr_name: str, value: str) -> Any:
67-
"""
68-
Decode a CloudEvent attribute value from an HTTP header.
69-
70-
Applies percent-decoding and special parsing for the 'time' attribute
71-
(converts to datetime object using RFC 3339 parsing).
72-
73-
:param attr_name: The name of the CloudEvent attribute
74-
:param value: The percent-encoded header value
75-
:return: Decoded value (datetime for 'time' attribute, string otherwise)
76-
"""
77-
decoded = unquote(value)
78-
79-
if attr_name == "time":
80-
return isoparse(decoded)
81-
82-
return decoded
83-
84-
8548
def to_binary(event: BaseCloudEvent, event_format: Format) -> HTTPMessage:
8649
"""
8750
Convert a CloudEvent to HTTP binary content mode.
@@ -113,14 +76,14 @@ def to_binary(event: BaseCloudEvent, event_format: Format) -> HTTPMessage:
11376
if attr_value is None:
11477
continue
11578

116-
if attr_name == "datacontenttype":
79+
if attr_name == DATACONTENTTYPE_ATTR:
11780
headers[CONTENT_TYPE_HEADER] = str(attr_value)
11881
else:
11982
header_name = f"{CE_PREFIX}{attr_name}"
120-
headers[header_name] = _encode_header_value(attr_value)
83+
headers[header_name] = encode_header_value(attr_value)
12184

12285
data = event.get_data()
123-
datacontenttype = attributes.get("datacontenttype")
86+
datacontenttype = attributes.get(DATACONTENTTYPE_ATTR)
12487
body = event_format.write_data(data, datacontenttype)
12588

12689
return HTTPMessage(headers=headers, body=body)
@@ -163,11 +126,11 @@ def from_binary(
163126

164127
if normalized_name.startswith(CE_PREFIX):
165128
attr_name = normalized_name[len(CE_PREFIX) :]
166-
attributes[attr_name] = _decode_header_value(attr_name, header_value)
129+
attributes[attr_name] = decode_header_value(attr_name, header_value)
167130
elif normalized_name == CONTENT_TYPE_HEADER:
168-
attributes["datacontenttype"] = header_value
131+
attributes[DATACONTENTTYPE_ATTR] = header_value
169132

170-
datacontenttype = attributes.get("datacontenttype")
133+
datacontenttype = attributes.get(DATACONTENTTYPE_ATTR)
171134
data = event_format.read_data(message.body, datacontenttype)
172135

173136
return event_factory(attributes, data)

0 commit comments

Comments
 (0)