summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCoprDistGit <infra@openeuler.org>2023-05-05 06:24:43 +0000
committerCoprDistGit <infra@openeuler.org>2023-05-05 06:24:43 +0000
commit3c141f62f6eb3987ccfbbffeadef7d6765bc5fbe (patch)
tree5fbbb0e3832fc8b737d37365265ca53079577046
parentcad138a7c15f79dc75cc1d68bc9f0ec28b7c9964 (diff)
automatic import of python-kafkaeskopeneuler20.03
-rw-r--r--.gitignore1
-rw-r--r--python-kafkaesk.spec914
-rw-r--r--sources1
3 files changed, 916 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
index e69de29..33f0747 100644
--- a/.gitignore
+++ b/.gitignore
@@ -0,0 +1 @@
+/kafkaesk-0.7.7.tar.gz
diff --git a/python-kafkaesk.spec b/python-kafkaesk.spec
new file mode 100644
index 0000000..b62da8c
--- /dev/null
+++ b/python-kafkaesk.spec
@@ -0,0 +1,914 @@
+%global _empty_manifest_terminate_build 0
+Name: python-kafkaesk
+Version: 0.7.7
+Release: 1
+Summary: Easy publish and subscribe to events with python and Kafka.
+License: BSD License
+URL: https://pypi.org/project/kafkaesk/
+Source0: https://mirrors.nju.edu.cn/pypi/web/packages/19/aa/cd56e3465a5db6fe732b4295617853fb072a1d982fcbc3cb5b5b6635b9bc/kafkaesk-0.7.7.tar.gz
+BuildArch: noarch
+
+Requires: python3-aiokafka
+Requires: python3-kafka-python
+Requires: python3-pydantic
+Requires: python3-orjson
+Requires: python3-jsonschema
+Requires: python3-prometheus_client
+Requires: python3-opentracing
+Requires: python3-async-timeout
+
+%description
+<!-- PROJECT LOGO -->
+<h1 align="center">
+ <br>
+ <img src="https://onna.com/wp-content/uploads/2020/03/h-onna-solid.png" alt="Onna Logo"></a>
+</h1>
+
+<h2 align="center">kafkaesk</h2>
+
+<!-- TABLE OF CONTENTS -->
+## Table Of Contents
+
+- [About the Project](#about-the-project)
+- [Publish](#publish)
+- [Subscribe](#subscribe)
+- [Avoiding global object](#avoiding-global-object)
+- [Manual commit](#manual-commit)
+- [kafkaesk contract](#kafkaesk-contract)
+- [Worker](#worker)
+- [Development](#development)
+- [Extensions](#extensions)
+- [Naming](#naming)
+
+
+## About The Project
+
+This project is meant to help facilitate effortless publishing and subscribing to events with Python and Kafka.
+
+### Guiding principal
+
+- HTTP
+- Language agnostic
+- Contracts built on top of [Kafka](https://kafka.apache.org/)
+
+
+### Alternatives
+ - [aiokafka](https://aiokafka.readthedocs.io/en/stable/): can be complex to scale correctly
+ - [guillotina_kafka](https://github.com/onna/guillotina_kafka): complex, tied to [Guillotina](https://guillotina.readthedocs.io/en/latest/)
+ - [faust](https://faust.readthedocs.io/en/latest/): requires additional data layers, not language agnostic
+ - confluent kafka + avro: close but ends up being like grpc. compilation for languages. No asyncio.
+
+> Consider this Python project as syntactic sugar around these ideas.
+
+## Publish
+
+Using [pydantic](https://pydantic-docs.helpmanual.io/) but can be done with pure JSON.
+
+```python
+import kafkaesk
+from pydantic import BaseModel
+
+app = kafkaesk.Application()
+
+@app.schema("Content", version=1, retention=24 * 60 * 60)
+class ContentMessage(BaseModel):
+ foo: str
+
+
+async def foobar():
+ # ...
+ # doing something in an async func
+ await app.publish("content.edited.Resource", data=ContentMessage(foo="bar"))
+```
+
+A convenience method is available in the `subscriber` dependency instance, this allow to header
+propagation from the consumed message.
+
+```python
+import kafkaesk
+from pydantic import BaseModel
+
+app = kafkaesk.Application()
+
+@app.schema("Content", version=1, retention=24 * 60 * 60)
+class ContentMessage(BaseModel):
+ foo: str
+
+
+@app.subscribe("content.*", "group_id")
+async def get_messages(data: ContentMessage, subscriber):
+ print(f"{data.foo}")
+ # This will propagate `data` record headers
+ await subscriber.publish("content.edited.Resource", data=ContentMessage(foo="bar"))
+
+```
+
+## Subscribe
+
+```python
+import kafkaesk
+from pydantic import BaseModel
+
+app = kafkaesk.Application()
+
+@app.schema("Content", version=1, retention=24 * 60 * 60)
+class ContentMessage(BaseModel):
+ foo: str
+
+
+@app.subscribe("content.*", "group_id")
+async def get_messages(data: ContentMessage):
+ print(f"{data.foo}")
+
+```
+
+## Avoiding global object
+
+If you do not want to have global application configuration, you can lazily configure
+the application and register schemas/subscribers separately.
+
+```python
+import kafkaesk
+from pydantic import BaseModel
+
+router = kafkaesk.Router()
+
+@router.schema("Content", version=1, retention=24 * 60 * 60)
+class ContentMessage(BaseModel):
+ foo: str
+
+
+@router.subscribe("content.*", "group_id")
+async def get_messages(data: ContentMessage):
+ print(f"{data.foo}")
+
+
+if __name__ == "__main__":
+ app = kafkaesk.Application()
+ app.mount(router)
+ kafkaesk.run(app)
+
+```
+
+Optional consumer injected parameters:
+
+- schema: str
+- record: aiokafka.structs.ConsumerRecord
+- app: kafkaesk.app.Application
+- subscriber: kafkaesk.app.BatchConsumer
+
+Depending on the type annotation for the first parameter, you will get different data injected:
+
+- `async def get_messages(data: ContentMessage)`: parses pydantic schema
+- `async def get_messages(data: bytes)`: give raw byte data
+- `async def get_messages(record: aiokafka.structs.ConsumerRecord)`: give kafka record object
+- `async def get_messages(data)`: raw json data in message
+
+## Manual commit
+
+To accomplish a manual commit strategy yourself:
+
+```python
+app = kafkaesk.Application(auto_commit=False)
+
+@app.subscribe("content.*", "group_id")
+async def get_messages(data: ContentMessage, subscriber):
+ print(f"{data.foo}")
+ await subscriber.consumer.commit()
+```
+
+## SSL
+Add these values to your `kafak_settings`:
+ `ssl_context` - this should be a placeholder as the SSL Context is generally created within the application
+ `security_protocol` - one of SSL or PLAINTEXT
+ `sasl_mechanism` - one of PLAIN, GSSAPI, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER
+ `sasl_plain_username`
+ `sasl_plain_password`
+
+## kafkaesk contract
+
+This is a library around using kafka.
+Kafka itself does not enforce these concepts.
+
+- Every message must provide a json schema
+- Messages produced will be validated against json schema
+- Each topic will have only one schema
+- A single schema can be used for multiple topics
+- Consumed message schema validation is up to the consumer
+- Messages will be consumed at least once. Considering this, your handling should be idempotent
+
+### Message format
+
+```json
+{
+ "schema": "schema_name:1",
+ "data": { ... }
+}
+```
+
+## Worker
+
+```bash
+kafkaesk mymodule:app --kafka-servers=localhost:9092
+```
+
+Options:
+
+ - --kafka-servers: comma separated list of kafka servers
+ - --kafka-settings: json encoded options to be passed to https://aiokafka.readthedocs.io/en/stable/api.html#aiokafkaconsumer-class
+ - --topic-prefix: prefix to use for topics
+ - --replication-factor: what replication factor topics should be created with. Defaults to min(number of servers, 3).
+
+### Application.publish
+
+- stream_id: str: name of stream to send data to
+- data: class that inherits from pydantic.BaseModel
+- key: Optional[bytes]: key for message if it needs one
+
+### Application.subscribe
+
+- stream_id: str: fnmatch pattern of streams to subscribe to
+- group: Optional[str]: consumer group id to use. Will use name of function if not provided
+
+### Application.schema
+
+- id: str: id of the schema to store
+- version: Optional[int]: version of schema to store
+- streams: Optional[List[str]]: if streams are known ahead of time, you can pre-create them before you push data
+- retention: Optional[int]: retention policy in seconds
+
+### Application.configure
+
+- kafka_servers: Optional[List[str]]: kafka servers to connect to
+- topic_prefix: Optional[str]: topic name prefix to subscribe to
+- kafka_settings: Optional[Dict[str, Any]]: additional aiokafka settings to pass in
+- replication_factor: Optional[int]: what replication factor topics should be created with. Defaults to min(number of servers, 3).
+- kafka_api_version: str: default `auto`
+- auto_commit: bool: default `True`
+- auto_commit_interval_ms: int: default `5000`
+
+## Development
+
+### Requirements
+
+- [Docker](https://www.docker.com/)
+- [Poetry](https://python-poetry.org/)
+
+```bash
+poetry install
+```
+
+Run tests:
+
+```bash
+docker-compose up
+KAFKA=localhost:9092 poetry run pytest tests
+```
+
+## Extensions
+
+### Logging
+This extension includes classes to extend Python's logging framework to publish structured log messages to a Kafka topic.
+This extension is made up of three main components: an extended `logging.LogRecord` and some custom `logging.Handler`s.
+
+See `logger.py` in examples directory.
+
+#### Log Record
+`kafkaesk.ext.logging.record.factory` is a function that will return `kafkaesk.ext.logging.record.PydanticLogRecord` objects.
+The `factory()` function scans through any `args` passed to a logger and checks each item to determine if it is a subclass of `pydantid.BaseModel`.
+
+If it is a base model instance and `model._is_log_model` evaluates to `True` the model will be removed from `args` and added to `record._pydantic_data`.
+After that `factory()` will use logging's existing logic to finish creating the log record.
+
+### Handler
+This extensions ships with two handlers capable of handling `kafkaesk.ext.logging.handler.PydanticLogModel` classes: `kafakesk.ext.logging.handler.PydanticStreamHandler` and `kafkaesk.ext.logging.handler.PydanticKafkaeskHandler`.
+
+The stream handler is a very small wrapper around `logging.StreamHandler`, the signature is the same, the only difference is that the handler will attempt to convert any pydantic models it receives to a human readable log message.
+
+The kafkaesk handler has a few more bits going on in the background.
+
+The handler has two required inputs, a `kafkaesk.app.Application` instance and a stream name.
+
+Once initialized any logs emitted by the handler will be saved into an internal queue.
+There is a worker task that handles pulling logs from the queue and writing those logs to the specified topic.
+
+# Naming
+
+It's hard and "kafka" is already a fun name.
+Hopefully this library isn't literally "kafkaesque" for you.
+
+
+%package -n python3-kafkaesk
+Summary: Easy publish and subscribe to events with python and Kafka.
+Provides: python-kafkaesk
+BuildRequires: python3-devel
+BuildRequires: python3-setuptools
+BuildRequires: python3-pip
+%description -n python3-kafkaesk
+<!-- PROJECT LOGO -->
+<h1 align="center">
+ <br>
+ <img src="https://onna.com/wp-content/uploads/2020/03/h-onna-solid.png" alt="Onna Logo"></a>
+</h1>
+
+<h2 align="center">kafkaesk</h2>
+
+<!-- TABLE OF CONTENTS -->
+## Table Of Contents
+
+- [About the Project](#about-the-project)
+- [Publish](#publish)
+- [Subscribe](#subscribe)
+- [Avoiding global object](#avoiding-global-object)
+- [Manual commit](#manual-commit)
+- [kafkaesk contract](#kafkaesk-contract)
+- [Worker](#worker)
+- [Development](#development)
+- [Extensions](#extensions)
+- [Naming](#naming)
+
+
+## About The Project
+
+This project is meant to help facilitate effortless publishing and subscribing to events with Python and Kafka.
+
+### Guiding principal
+
+- HTTP
+- Language agnostic
+- Contracts built on top of [Kafka](https://kafka.apache.org/)
+
+
+### Alternatives
+ - [aiokafka](https://aiokafka.readthedocs.io/en/stable/): can be complex to scale correctly
+ - [guillotina_kafka](https://github.com/onna/guillotina_kafka): complex, tied to [Guillotina](https://guillotina.readthedocs.io/en/latest/)
+ - [faust](https://faust.readthedocs.io/en/latest/): requires additional data layers, not language agnostic
+ - confluent kafka + avro: close but ends up being like grpc. compilation for languages. No asyncio.
+
+> Consider this Python project as syntactic sugar around these ideas.
+
+## Publish
+
+Using [pydantic](https://pydantic-docs.helpmanual.io/) but can be done with pure JSON.
+
+```python
+import kafkaesk
+from pydantic import BaseModel
+
+app = kafkaesk.Application()
+
+@app.schema("Content", version=1, retention=24 * 60 * 60)
+class ContentMessage(BaseModel):
+ foo: str
+
+
+async def foobar():
+ # ...
+ # doing something in an async func
+ await app.publish("content.edited.Resource", data=ContentMessage(foo="bar"))
+```
+
+A convenience method is available in the `subscriber` dependency instance, this allow to header
+propagation from the consumed message.
+
+```python
+import kafkaesk
+from pydantic import BaseModel
+
+app = kafkaesk.Application()
+
+@app.schema("Content", version=1, retention=24 * 60 * 60)
+class ContentMessage(BaseModel):
+ foo: str
+
+
+@app.subscribe("content.*", "group_id")
+async def get_messages(data: ContentMessage, subscriber):
+ print(f"{data.foo}")
+ # This will propagate `data` record headers
+ await subscriber.publish("content.edited.Resource", data=ContentMessage(foo="bar"))
+
+```
+
+## Subscribe
+
+```python
+import kafkaesk
+from pydantic import BaseModel
+
+app = kafkaesk.Application()
+
+@app.schema("Content", version=1, retention=24 * 60 * 60)
+class ContentMessage(BaseModel):
+ foo: str
+
+
+@app.subscribe("content.*", "group_id")
+async def get_messages(data: ContentMessage):
+ print(f"{data.foo}")
+
+```
+
+## Avoiding global object
+
+If you do not want to have global application configuration, you can lazily configure
+the application and register schemas/subscribers separately.
+
+```python
+import kafkaesk
+from pydantic import BaseModel
+
+router = kafkaesk.Router()
+
+@router.schema("Content", version=1, retention=24 * 60 * 60)
+class ContentMessage(BaseModel):
+ foo: str
+
+
+@router.subscribe("content.*", "group_id")
+async def get_messages(data: ContentMessage):
+ print(f"{data.foo}")
+
+
+if __name__ == "__main__":
+ app = kafkaesk.Application()
+ app.mount(router)
+ kafkaesk.run(app)
+
+```
+
+Optional consumer injected parameters:
+
+- schema: str
+- record: aiokafka.structs.ConsumerRecord
+- app: kafkaesk.app.Application
+- subscriber: kafkaesk.app.BatchConsumer
+
+Depending on the type annotation for the first parameter, you will get different data injected:
+
+- `async def get_messages(data: ContentMessage)`: parses pydantic schema
+- `async def get_messages(data: bytes)`: give raw byte data
+- `async def get_messages(record: aiokafka.structs.ConsumerRecord)`: give kafka record object
+- `async def get_messages(data)`: raw json data in message
+
+## Manual commit
+
+To accomplish a manual commit strategy yourself:
+
+```python
+app = kafkaesk.Application(auto_commit=False)
+
+@app.subscribe("content.*", "group_id")
+async def get_messages(data: ContentMessage, subscriber):
+ print(f"{data.foo}")
+ await subscriber.consumer.commit()
+```
+
+## SSL
+Add these values to your `kafak_settings`:
+ `ssl_context` - this should be a placeholder as the SSL Context is generally created within the application
+ `security_protocol` - one of SSL or PLAINTEXT
+ `sasl_mechanism` - one of PLAIN, GSSAPI, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER
+ `sasl_plain_username`
+ `sasl_plain_password`
+
+## kafkaesk contract
+
+This is a library around using kafka.
+Kafka itself does not enforce these concepts.
+
+- Every message must provide a json schema
+- Messages produced will be validated against json schema
+- Each topic will have only one schema
+- A single schema can be used for multiple topics
+- Consumed message schema validation is up to the consumer
+- Messages will be consumed at least once. Considering this, your handling should be idempotent
+
+### Message format
+
+```json
+{
+ "schema": "schema_name:1",
+ "data": { ... }
+}
+```
+
+## Worker
+
+```bash
+kafkaesk mymodule:app --kafka-servers=localhost:9092
+```
+
+Options:
+
+ - --kafka-servers: comma separated list of kafka servers
+ - --kafka-settings: json encoded options to be passed to https://aiokafka.readthedocs.io/en/stable/api.html#aiokafkaconsumer-class
+ - --topic-prefix: prefix to use for topics
+ - --replication-factor: what replication factor topics should be created with. Defaults to min(number of servers, 3).
+
+### Application.publish
+
+- stream_id: str: name of stream to send data to
+- data: class that inherits from pydantic.BaseModel
+- key: Optional[bytes]: key for message if it needs one
+
+### Application.subscribe
+
+- stream_id: str: fnmatch pattern of streams to subscribe to
+- group: Optional[str]: consumer group id to use. Will use name of function if not provided
+
+### Application.schema
+
+- id: str: id of the schema to store
+- version: Optional[int]: version of schema to store
+- streams: Optional[List[str]]: if streams are known ahead of time, you can pre-create them before you push data
+- retention: Optional[int]: retention policy in seconds
+
+### Application.configure
+
+- kafka_servers: Optional[List[str]]: kafka servers to connect to
+- topic_prefix: Optional[str]: topic name prefix to subscribe to
+- kafka_settings: Optional[Dict[str, Any]]: additional aiokafka settings to pass in
+- replication_factor: Optional[int]: what replication factor topics should be created with. Defaults to min(number of servers, 3).
+- kafka_api_version: str: default `auto`
+- auto_commit: bool: default `True`
+- auto_commit_interval_ms: int: default `5000`
+
+## Development
+
+### Requirements
+
+- [Docker](https://www.docker.com/)
+- [Poetry](https://python-poetry.org/)
+
+```bash
+poetry install
+```
+
+Run tests:
+
+```bash
+docker-compose up
+KAFKA=localhost:9092 poetry run pytest tests
+```
+
+## Extensions
+
+### Logging
+This extension includes classes to extend Python's logging framework to publish structured log messages to a Kafka topic.
+This extension is made up of three main components: an extended `logging.LogRecord` and some custom `logging.Handler`s.
+
+See `logger.py` in examples directory.
+
+#### Log Record
+`kafkaesk.ext.logging.record.factory` is a function that will return `kafkaesk.ext.logging.record.PydanticLogRecord` objects.
+The `factory()` function scans through any `args` passed to a logger and checks each item to determine if it is a subclass of `pydantid.BaseModel`.
+
+If it is a base model instance and `model._is_log_model` evaluates to `True` the model will be removed from `args` and added to `record._pydantic_data`.
+After that `factory()` will use logging's existing logic to finish creating the log record.
+
+### Handler
+This extensions ships with two handlers capable of handling `kafkaesk.ext.logging.handler.PydanticLogModel` classes: `kafakesk.ext.logging.handler.PydanticStreamHandler` and `kafkaesk.ext.logging.handler.PydanticKafkaeskHandler`.
+
+The stream handler is a very small wrapper around `logging.StreamHandler`, the signature is the same, the only difference is that the handler will attempt to convert any pydantic models it receives to a human readable log message.
+
+The kafkaesk handler has a few more bits going on in the background.
+
+The handler has two required inputs, a `kafkaesk.app.Application` instance and a stream name.
+
+Once initialized any logs emitted by the handler will be saved into an internal queue.
+There is a worker task that handles pulling logs from the queue and writing those logs to the specified topic.
+
+# Naming
+
+It's hard and "kafka" is already a fun name.
+Hopefully this library isn't literally "kafkaesque" for you.
+
+
+%package help
+Summary: Development documents and examples for kafkaesk
+Provides: python3-kafkaesk-doc
+%description help
+<!-- PROJECT LOGO -->
+<h1 align="center">
+ <br>
+ <img src="https://onna.com/wp-content/uploads/2020/03/h-onna-solid.png" alt="Onna Logo"></a>
+</h1>
+
+<h2 align="center">kafkaesk</h2>
+
+<!-- TABLE OF CONTENTS -->
+## Table Of Contents
+
+- [About the Project](#about-the-project)
+- [Publish](#publish)
+- [Subscribe](#subscribe)
+- [Avoiding global object](#avoiding-global-object)
+- [Manual commit](#manual-commit)
+- [kafkaesk contract](#kafkaesk-contract)
+- [Worker](#worker)
+- [Development](#development)
+- [Extensions](#extensions)
+- [Naming](#naming)
+
+
+## About The Project
+
+This project is meant to help facilitate effortless publishing and subscribing to events with Python and Kafka.
+
+### Guiding principal
+
+- HTTP
+- Language agnostic
+- Contracts built on top of [Kafka](https://kafka.apache.org/)
+
+
+### Alternatives
+ - [aiokafka](https://aiokafka.readthedocs.io/en/stable/): can be complex to scale correctly
+ - [guillotina_kafka](https://github.com/onna/guillotina_kafka): complex, tied to [Guillotina](https://guillotina.readthedocs.io/en/latest/)
+ - [faust](https://faust.readthedocs.io/en/latest/): requires additional data layers, not language agnostic
+ - confluent kafka + avro: close but ends up being like grpc. compilation for languages. No asyncio.
+
+> Consider this Python project as syntactic sugar around these ideas.
+
+## Publish
+
+Using [pydantic](https://pydantic-docs.helpmanual.io/) but can be done with pure JSON.
+
+```python
+import kafkaesk
+from pydantic import BaseModel
+
+app = kafkaesk.Application()
+
+@app.schema("Content", version=1, retention=24 * 60 * 60)
+class ContentMessage(BaseModel):
+ foo: str
+
+
+async def foobar():
+ # ...
+ # doing something in an async func
+ await app.publish("content.edited.Resource", data=ContentMessage(foo="bar"))
+```
+
+A convenience method is available in the `subscriber` dependency instance, this allow to header
+propagation from the consumed message.
+
+```python
+import kafkaesk
+from pydantic import BaseModel
+
+app = kafkaesk.Application()
+
+@app.schema("Content", version=1, retention=24 * 60 * 60)
+class ContentMessage(BaseModel):
+ foo: str
+
+
+@app.subscribe("content.*", "group_id")
+async def get_messages(data: ContentMessage, subscriber):
+ print(f"{data.foo}")
+ # This will propagate `data` record headers
+ await subscriber.publish("content.edited.Resource", data=ContentMessage(foo="bar"))
+
+```
+
+## Subscribe
+
+```python
+import kafkaesk
+from pydantic import BaseModel
+
+app = kafkaesk.Application()
+
+@app.schema("Content", version=1, retention=24 * 60 * 60)
+class ContentMessage(BaseModel):
+ foo: str
+
+
+@app.subscribe("content.*", "group_id")
+async def get_messages(data: ContentMessage):
+ print(f"{data.foo}")
+
+```
+
+## Avoiding global object
+
+If you do not want to have global application configuration, you can lazily configure
+the application and register schemas/subscribers separately.
+
+```python
+import kafkaesk
+from pydantic import BaseModel
+
+router = kafkaesk.Router()
+
+@router.schema("Content", version=1, retention=24 * 60 * 60)
+class ContentMessage(BaseModel):
+ foo: str
+
+
+@router.subscribe("content.*", "group_id")
+async def get_messages(data: ContentMessage):
+ print(f"{data.foo}")
+
+
+if __name__ == "__main__":
+ app = kafkaesk.Application()
+ app.mount(router)
+ kafkaesk.run(app)
+
+```
+
+Optional consumer injected parameters:
+
+- schema: str
+- record: aiokafka.structs.ConsumerRecord
+- app: kafkaesk.app.Application
+- subscriber: kafkaesk.app.BatchConsumer
+
+Depending on the type annotation for the first parameter, you will get different data injected:
+
+- `async def get_messages(data: ContentMessage)`: parses pydantic schema
+- `async def get_messages(data: bytes)`: give raw byte data
+- `async def get_messages(record: aiokafka.structs.ConsumerRecord)`: give kafka record object
+- `async def get_messages(data)`: raw json data in message
+
+## Manual commit
+
+To accomplish a manual commit strategy yourself:
+
+```python
+app = kafkaesk.Application(auto_commit=False)
+
+@app.subscribe("content.*", "group_id")
+async def get_messages(data: ContentMessage, subscriber):
+ print(f"{data.foo}")
+ await subscriber.consumer.commit()
+```
+
+## SSL
+Add these values to your `kafak_settings`:
+ `ssl_context` - this should be a placeholder as the SSL Context is generally created within the application
+ `security_protocol` - one of SSL or PLAINTEXT
+ `sasl_mechanism` - one of PLAIN, GSSAPI, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER
+ `sasl_plain_username`
+ `sasl_plain_password`
+
+## kafkaesk contract
+
+This is a library around using kafka.
+Kafka itself does not enforce these concepts.
+
+- Every message must provide a json schema
+- Messages produced will be validated against json schema
+- Each topic will have only one schema
+- A single schema can be used for multiple topics
+- Consumed message schema validation is up to the consumer
+- Messages will be consumed at least once. Considering this, your handling should be idempotent
+
+### Message format
+
+```json
+{
+ "schema": "schema_name:1",
+ "data": { ... }
+}
+```
+
+## Worker
+
+```bash
+kafkaesk mymodule:app --kafka-servers=localhost:9092
+```
+
+Options:
+
+ - --kafka-servers: comma separated list of kafka servers
+ - --kafka-settings: json encoded options to be passed to https://aiokafka.readthedocs.io/en/stable/api.html#aiokafkaconsumer-class
+ - --topic-prefix: prefix to use for topics
+ - --replication-factor: what replication factor topics should be created with. Defaults to min(number of servers, 3).
+
+### Application.publish
+
+- stream_id: str: name of stream to send data to
+- data: class that inherits from pydantic.BaseModel
+- key: Optional[bytes]: key for message if it needs one
+
+### Application.subscribe
+
+- stream_id: str: fnmatch pattern of streams to subscribe to
+- group: Optional[str]: consumer group id to use. Will use name of function if not provided
+
+### Application.schema
+
+- id: str: id of the schema to store
+- version: Optional[int]: version of schema to store
+- streams: Optional[List[str]]: if streams are known ahead of time, you can pre-create them before you push data
+- retention: Optional[int]: retention policy in seconds
+
+### Application.configure
+
+- kafka_servers: Optional[List[str]]: kafka servers to connect to
+- topic_prefix: Optional[str]: topic name prefix to subscribe to
+- kafka_settings: Optional[Dict[str, Any]]: additional aiokafka settings to pass in
+- replication_factor: Optional[int]: what replication factor topics should be created with. Defaults to min(number of servers, 3).
+- kafka_api_version: str: default `auto`
+- auto_commit: bool: default `True`
+- auto_commit_interval_ms: int: default `5000`
+
+## Development
+
+### Requirements
+
+- [Docker](https://www.docker.com/)
+- [Poetry](https://python-poetry.org/)
+
+```bash
+poetry install
+```
+
+Run tests:
+
+```bash
+docker-compose up
+KAFKA=localhost:9092 poetry run pytest tests
+```
+
+## Extensions
+
+### Logging
+This extension includes classes to extend Python's logging framework to publish structured log messages to a Kafka topic.
+This extension is made up of three main components: an extended `logging.LogRecord` and some custom `logging.Handler`s.
+
+See `logger.py` in examples directory.
+
+#### Log Record
+`kafkaesk.ext.logging.record.factory` is a function that will return `kafkaesk.ext.logging.record.PydanticLogRecord` objects.
+The `factory()` function scans through any `args` passed to a logger and checks each item to determine if it is a subclass of `pydantid.BaseModel`.
+
+If it is a base model instance and `model._is_log_model` evaluates to `True` the model will be removed from `args` and added to `record._pydantic_data`.
+After that `factory()` will use logging's existing logic to finish creating the log record.
+
+### Handler
+This extensions ships with two handlers capable of handling `kafkaesk.ext.logging.handler.PydanticLogModel` classes: `kafakesk.ext.logging.handler.PydanticStreamHandler` and `kafkaesk.ext.logging.handler.PydanticKafkaeskHandler`.
+
+The stream handler is a very small wrapper around `logging.StreamHandler`, the signature is the same, the only difference is that the handler will attempt to convert any pydantic models it receives to a human readable log message.
+
+The kafkaesk handler has a few more bits going on in the background.
+
+The handler has two required inputs, a `kafkaesk.app.Application` instance and a stream name.
+
+Once initialized any logs emitted by the handler will be saved into an internal queue.
+There is a worker task that handles pulling logs from the queue and writing those logs to the specified topic.
+
+# Naming
+
+It's hard and "kafka" is already a fun name.
+Hopefully this library isn't literally "kafkaesque" for you.
+
+
+%prep
+%autosetup -n kafkaesk-0.7.7
+
+%build
+%py3_build
+
+%install
+%py3_install
+install -d -m755 %{buildroot}/%{_pkgdocdir}
+if [ -d doc ]; then cp -arf doc %{buildroot}/%{_pkgdocdir}; fi
+if [ -d docs ]; then cp -arf docs %{buildroot}/%{_pkgdocdir}; fi
+if [ -d example ]; then cp -arf example %{buildroot}/%{_pkgdocdir}; fi
+if [ -d examples ]; then cp -arf examples %{buildroot}/%{_pkgdocdir}; fi
+pushd %{buildroot}
+if [ -d usr/lib ]; then
+ find usr/lib -type f -printf "/%h/%f\n" >> filelist.lst
+fi
+if [ -d usr/lib64 ]; then
+ find usr/lib64 -type f -printf "/%h/%f\n" >> filelist.lst
+fi
+if [ -d usr/bin ]; then
+ find usr/bin -type f -printf "/%h/%f\n" >> filelist.lst
+fi
+if [ -d usr/sbin ]; then
+ find usr/sbin -type f -printf "/%h/%f\n" >> filelist.lst
+fi
+touch doclist.lst
+if [ -d usr/share/man ]; then
+ find usr/share/man -type f -printf "/%h/%f.gz\n" >> doclist.lst
+fi
+popd
+mv %{buildroot}/filelist.lst .
+mv %{buildroot}/doclist.lst .
+
+%files -n python3-kafkaesk -f filelist.lst
+%dir %{python3_sitelib}/*
+
+%files help -f doclist.lst
+%{_docdir}/*
+
+%changelog
+* Fri May 05 2023 Python_Bot <Python_Bot@openeuler.org> - 0.7.7-1
+- Package Spec generated
diff --git a/sources b/sources
new file mode 100644
index 0000000..1006585
--- /dev/null
+++ b/sources
@@ -0,0 +1 @@
+4b0174230810fa8585ca1214726a01fb kafkaesk-0.7.7.tar.gz