summaryrefslogtreecommitdiff
path: root/python-nubium-utils.spec
diff options
context:
space:
mode:
Diffstat (limited to 'python-nubium-utils.spec')
-rw-r--r--python-nubium-utils.spec1328
1 files changed, 1328 insertions, 0 deletions
diff --git a/python-nubium-utils.spec b/python-nubium-utils.spec
new file mode 100644
index 0000000..8854cc6
--- /dev/null
+++ b/python-nubium-utils.spec
@@ -0,0 +1,1328 @@
+%global _empty_manifest_terminate_build 0
+Name: python-nubium-utils
+Version: 4.2.0
+Release: 1
+Summary: Some Kafka utility functions and patterns for the nubium project
+License: MIT License
+URL: https://gitlab.corp.redhat.com/mkt-ops-de/nubium-utils.git
+Source0: https://mirrors.nju.edu.cn/pypi/web/packages/24/90/9c0cef4d47caf1d9bdc3e53c16edd2322592941cc79baacf73948110c0e9/nubium-utils-4.2.0.tar.gz
+BuildArch: noarch
+
+Requires: python3-fluvii
+Requires: python3-aiosfstream
+Requires: python3-dictdiffer
+Requires: python3-psutil
+Requires: python3-pydantic
+Requires: python3-box
+Requires: python3-dateutil
+Requires: python3-dotenv
+Requires: python3-eloqua-wrapper
+Requires: python3-pytz
+Requires: python3-requests
+Requires: python3-simple-salesforce
+Requires: python3-fluvii
+Requires: python3-aiosfstream
+Requires: python3-dictdiffer
+Requires: python3-psutil
+Requires: python3-pydantic
+Requires: python3-box
+Requires: python3-dateutil
+Requires: python3-dotenv
+Requires: python3-eloqua-wrapper
+Requires: python3-pytz
+Requires: python3-requests
+Requires: python3-simple-salesforce
+Requires: python3-pip-tools
+Requires: python3-pytest
+Requires: python3-pytest-cov
+Requires: python3-time-machine
+Requires: python3-twine
+
+%description
+# Nubium Utils
+
+## Environment Variables
+
+Nubium-Utils now has a prefix for every environment variable it uses. Every environment
+variable related to Nubium-Utils starts with `NU_`.
+
+Similarly, anything DUDE-related will start with `DUDE_`.
+
+### Important Environment Variables
+
+Nubium-Utils relies on having these environment variables defined:
+- `NU_SCHEMA_REGISTRY_URL`: schema registry url
+- `NU_SCHEMA_REGISTRY_USERNAME`: schema registry username (if applicable)
+- `NU_SCHEMA_REGISTRY_PASSWORD`: schema registry password (if applicable)
+- `NU_KAFKA_CLUSTERS_CONFIGS_JSON`: json of clusters and their respective connection settings. EX:
+`'{"cluster_0": {"url": "url", "username": "un", "password": "pw"}, "cluster_1": {"url": "url", "username": "un", "password": "pw"}}'`
+- `NU_TOPIC_CONFIGS_JSON`: json of topics and their respective cluster name (will reference NU_KAFKA_CLUSTERS) + (optional) create settings. EX:
+`'{"topic_a": {"cluster": "cluster_0", "configs": {"num_partitions": 2, "replication_factor": 2, "config": {"segment.ms": 120000}}}, "topic_b": {"cluster": cluster_1}}'`
+- `NU_HOSTNAME`: used for identifying unique consumer instances
+- `NU_APP_NAME`: used to define your consumer group
+- `NU_MP_PROJECT`: used for openshift deployment
+
+
+## Confluent-Kafka GTFO App Classes
+
+### Overview
+Nubium Utils now has an app framework for managing exactly-once processing confluent-kafka applications, named `GTFO`.
+The idea is to simplify confluent-kafka down to a kafka-streams-like interface, where consumer management is largely
+handled for you under the hood. This is particularly nice for having to manage exactly-once processing,
+which the class implements and uses by default.
+
+There are some other subclasses for some more specific use cases, namely `GtfoBatchApp` and `GtfoTableApp`, but
+the base class `GtfoApp` has been written with the intention of being easily extensible. Further details in
+terms of the recommended approaches of this will be described below.
+
+
+### The `Transaction` and `Gtfo` classes
+
+There are two essential classes you should understand before you dive in.
+
+The `Transaction` classes are actually the heart of the `GTFO` framework; they are what your business
+logic methods will actually be interacting with on a message-to-message basis. That being said, for most
+use cases, you wont even need to do much with them other than `Transaction.messages()` to get the currently consumed
+message, and `Transaction.produce()` to send a new message out.
+
+The `Gtfo`-based classes generally wrap/interface with the `Transaction` objects, generating a new one for
+every new transaction (which generally consists of consuming a message, produce desired messages, commit
+consumed message.) In general, you will really only use the `Gtfo` class to initialize everything, and it doesn't
+take much.
+
+Finally, as a general rule of thumb for both, there are not many methods to interact with on either class...
+on purpose! The functionality outlined in here will likely cover >90% of use cases.
+
+
+### Initializing/running a `GtfoApp`: basic example
+
+NOTE: there is a lot that is managed for you via environment variables, so definitely take a look at the
+"**Important Environment Variables**" section to see what you should have defined before trying to run a `GTFO` app.
+
+There are two basic components you need to initialize an app at all:
+
+- `app_function`: the business logic of your application, which should use exactly 1 argument: the `Transaction` objects
+that will be handed to it for every message.
+
+- `consume_topics_list`: the topics you are consuming. Can be comma-separated string or python list.
+
+That's it! That being said, this is if your app only consumes. To produce, you will additionally need, at minimum:
+
+- `produce_topic_schema_dict`: a dict that maps {'topic_name': _schema_obj_}, where the _schema_obj_ is a valid avro
+schema dict.
+
+Then, to run the app, do:
+
+`GtfoApp.run()`
+
+Altogether, that might look something like this:
+
+```python
+from nubium_utils.confluent_utils.transaction_utils import GtfoApp
+
+useless_schema = {
+ "name": "CoolSchema",
+ "type": "record",
+ "fields": [
+ {
+ "name": "cool_field",
+ "type": "string"
+ }
+ ]
+}
+
+
+def useless_example_func(transaction, thing_inited_at_runtime):
+ msg = transaction.messages() # can also do transaction.value(); uses the confluent-kafka method with single messages
+ cool_message_out = msg.value()
+ cool_message_out['cool_field'] = thing_inited_at_runtime # 'cool value'
+ transaction.produce({'topic':'cool_topic_out', 'key': msg.key(), 'value': cool_message_out, 'headers': msg.headers()})
+
+init_at_runtime_thing = 'cool value' # can hand off objects you need to init at runtime, like an api session object
+
+gtfo_app = GtfoApp(
+ app_function=useless_example_func,
+ consume_topics_list=['test_topic_1', 'test_topic_2'],
+ produce_topic_schema_dict={'cool_topic_out': useless_schema},
+ app_function_arglist = [init_at_runtime_thing]) # optional! Here to show functionality.
+gtfo_app.run()
+```
+
+### Using `GtfoBatchApp` (plus `BatchTransaction`)
+
+Sometimes, you want to handle multiple messages at once, such as doing a bulk upload of data to an API.
+In this case, treating each message as a separate transaction doesn't make much sense! For this, we have `GtfoBatchApp`!
+
+We still rely on much of what `Gtfo` and `Transaction` lays out, but now we can specify how many messages should
+be consumed by default for a given transaction. Additionally, you can consume more messages on demand with the
+`BulkTransaction` object via `BulkTransaction.consume()` in your `app_function`, in case you'd like to do the normal
+consume pattern most of the time, but might need that to change on demand.
+
+Biggest thing to note here: all messages for that transaction will (just like `Transaction`) be accessible via
+`BatchTransaction.messages()`. Also, all of them get committed when you finish with that instance of `app_function` just
+like a typical singleton run of `app_function`, so keep that in mind!
+
+You can tweak the default batch size via `NU_CONSUMER_DEFAULT_BATCH_CONSUME_MAX_COUNT`; defaulted to 1.
+
+Here is an example:
+
+```python
+from nubium_utils.confluent_utils.transaction_utils import GtfoBatchApp
+from cool_api import bulk_upload
+
+useless_schema = {
+ "name": "CoolSchema",
+ "type": "record",
+ "fields": [
+ {
+ "name": "cool_field",
+ "type": "string"
+ }
+ ]
+}
+
+def prep_data(msg_val):
+ # do stuff
+ pass
+
+def useless_example_func(bulk_transaction):
+ msg = transaction.messages()
+ last_msg = msg[-1]
+ # I originally set my max to 10 via the env var...but with some logic, if a message says I should do a bigger batch...
+ if last_msg.headers()['needs_bigger_bulk'] == 'true':
+ bulk_transaction.consume(consume_max_count=500) # allow me to raise that max to 500, and consume up to that (will consume up to 490 more)!
+ bulk_upload([prep_data(msg.value()) for msg in bulk_transaction.messages()]) # push all 500 messages
+
+gtfo_app = GtfoBatchApp(
+ app_function=useless_example_func,
+ consume_topic='test_topic_in',
+ produce_topic_schema_dict={'cool_topic_out': useless_schema})
+gtfo_app.run()
+```
+
+
+### Using `GtfoTableApp` (plus `TableTransaction`)
+
+One of the more complex components of GTFO is the `GtfoTableApp`. It allows you to store state based on a
+kafka topic via a localized datastore. Basically, you can store whatever you want with respect to a given kafka key,
+and later reference/compare that data against a new version of that key.
+
+There are some functional caveats/limitations that come along with this feature set:
+- The app can only consume from one topic (you can make an upstream "funnel" app as needed).
+- You must make a topic named the same as `{NU_APP_NAME}__changelog`, with the same partition count as the topic it
+would consume from.
+- You can only store/reference things based on the same key as the current message.
+- Each instance of your application needs to use the same volume.
+- Data needs to be stored as a json/dict (it is stored as a string and `json.load`-ed at runtime)
+
+With that in mind, set up is almost exactly the same as `GtfoApp`; here is an example:
+
+```python
+from nubium_utils.confluent_utils.transaction_utils import GtfoTableApp
+
+useless_schema = {
+ "name": "CoolSchema",
+ "type": "record",
+ "fields": [
+ {
+ "name": "cool_field",
+ "type": "string"
+ }
+ ]
+}
+
+
+def useless_example_func(transaction):
+ msg = transaction.messages() # can also do transaction.value(); uses the confluent-kafka method with single messages
+ cool_message_out = msg.value()
+ previous_message = transaction.read_table_entry()
+ # lets only do stuff if our previous version for the same key was "just cool"!
+ if previous_message['cool_field'] == 'just cool':
+ cool_message_out['cool_field'] = 'very cool now'
+ transaction.update_table_entry(cool_message_out) # we want to update the table with our new value. It does not do this automatically!
+ transaction.produce({'topic':'cool_topic_out', 'key': msg.key(), 'value': cool_message_out, 'headers': msg.headers()})
+
+
+gtfo_app = GtfoTableApp(
+ app_function=useless_example_func,
+ consume_topic='test_topic_in',
+ produce_topic_schema_dict={'cool_topic_out': useless_schema})
+gtfo_app.run()
+```
+
+### Extending `GtfoApp` and `Transaction`
+
+Of course, you can never cover _every_ use case! As such, each class was designed
+with extensibility in mind!
+
+Most often, what needs more customization is your consumption pattern (hence why there was already a bulk class!),
+and there is a relatively painless way to address that with minimal alterations.
+
+There is an `init` argument on `GtfoApp` named `transaction_type`; this allows you to easily pass an augmented
+version of `Transaction` with your changed consumption pattern, potentially without changing the `Transaction` class
+at all!
+
+Otherwise, hopefully things have been compartimentalized enough that you can just replace methods as needed, but
+in general, usually you'll need to mess a little with both classes, but likely mostly `Transaction`.
+
+## Monitoring
+The monitoring utils enable metrics to be surfaced from the kafka applications
+so the Prometheus server can scrape them.
+The Prometheus server can't dynamically figure out pod IPs and scrape the
+services directly, so we're using a metrics cache instead.
+
+The metrics cache is a StatefulSet with 2 services assigned to it.
+One service is a normal service, with a unique cluster IP address.
+The prometheus server scrapes this service endpoint.
+The other service doesn't have a cluster IP,
+which means that the monitoring utility can find the IP addresses of each
+of the backing pods, and send metrics to all of the pods.
+This setup gives us high-availability guarantees.
+
+The Monitoring utils are organized into two classes, `MetricsPusher` and `MetricsManager`.
+
+The `MetricsManager` is a container for all of the metrics for the app,
+and contains convenience methods for the 3 standardized metrics.
+These metrics are
+- `messages_consumed`: The number of messages consumed by the app
+- `messages_produced`: The number of messages produced by the app
+- `message_errors`: The number of exceptions caught in the app (labeled by type)
+
+The `MetricsPusher` handles pushing the applications metrics to the metrics cache.
+It determines the list of IP addresses for all of the metrics cache pods,
+and sends the current metrics values for all of the metrics.
+
+### Metric names and labels
+The names of the metrics in Prometheus are the same as their names as parameters
+- `messages_consumed`
+- `messages_produced`
+- `message_errors`
+
+Two labels exist for every metric:
+- `app`: The name of the microservice the metric came from
+- `job`: The name of the individual pod the metric came from
+The `message_errors` metric also has another label:
+- `exception`: The name of the exception that triggered the metric
+
+### Monitoring Setup Examples
+The initialization and update loop for application monitoring will differ
+from application to application based on their architecture.
+The following examples should cover the standard designs we use.
+
+#### Default Kafka Client Application
+A Kafka application that directly relies on interacting with Producer or
+Consumer clients should have it's monitoring classes set up and its
+pushing thread started in the main run function and passed to the loop, as follows:
+```python
+import os
+
+from confluent_kafka import Consumer, Producer
+from nubium_utils.metrics import MetricsManager, MetricsPusher, start_pushing_metrics
+
+def run_function():
+
+ consumer = Consumer()
+ producer = Producer()
+
+ metrics_pusher = MetricsPusher(
+ os.environ['HOSTNAME'],
+ os.environ['METRICS_SERVICE_NAME'],
+ os.environ['METRICS_SERVICE_PORT'],
+ os.environ['METRICS_POD_PORT'])
+ metrics_manager = MetricsManager(job=os.environ['HOSTNAME'], app=os.environ['APP_NAME'], metrics_pusher=metrics_pusher)
+ start_pushing_metrics(metrics_manager, int(os.environ['METRICS_PUSH_RATE']))
+
+ try:
+ while True:
+ loop_function(consumer, producer, metrics_manager=metrics_manager)
+ finally:
+ consumer.close()
+
+```
+
+The `consume_message()` function from this library expects a metrics_manager object
+as an argument, so that it can increment the `messages_consumed` metric.
+
+The application itself needs to increment the `messages_produced` metric
+needs to be incremented as necessary by the application itself
+whenever a Kafka message is produced. The convenience method on the metrics_manager
+`inc_messages_produced()` makes this easier,
+since it automatically adds the necessary labels to the metric.
+
+The application also needs to be set to increment the `message_errors` metric
+whenever an exception is caught.
+
+An example loop function might look like this:
+```python
+import os
+import logging
+
+from nubium_utils import consume_message
+from nubium_utils.custom_exceptions import NoMessageError
+
+
+def loop_function(consumer, producer, metrics_manager):
+ try:
+ message = consume_message(consumer, int(os.environ['CONSUMER_POLL_TIMEOUT']), metrics_manager)
+ outgoing_key = message.value()['email_address']
+ producer.produce(topic='outgoing_topic',key=outgoing_key,value=message.value())
+ metrics_manager.inc_messages_produced(1)
+ except NoMessageError:
+ pass
+ except KeyError as error:
+ metrics_manager.inc_message_errors(error)
+ logging.debug('Message missing email address')
+
+
+```
+
+#### Flask Kafka Application
+The setup becomes a little bit different with a Flask application.
+The metrics_manager should be accessible through the app's configuration,
+so that it can be accessed in route functions.
+
+The preferred method for error monitoring is to hook into the built in
+flask error handling loop, using the `@app.errorhandler` decorator.
+Here is an example `create_app()` function
+
+```python
+import flask
+from werkzeug.exceptions import HTTPException
+
+from .forms_producer_app import forms_producer
+from .util_blueprint import app_util_bp
+
+def create_app(config):
+ """
+ Creates app from config and needed blueprints
+ :param config: (Config) object used to configure the flask app
+ :return: (flask.App) the application object
+ """
+ app = flask.Flask(__name__)
+ app.config.from_object(config)
+
+ app.register_blueprint(forms_producer)
+ app.register_blueprint(app_util_bp)
+
+ @app.errorhandler(HTTPException)
+ def handle_exception(e):
+ """
+ Increment error gauge on metrics_manager before returning error message
+ """
+ response = e.get_response()
+ response.data = f'{e.code}:{e.name} - {e.description}'
+ app.config['MONITOR'].inc_message_errors(e)
+ return response
+
+ @app.errorhandler(Exception)
+ def unhandled_exception(error):
+ app.logger.error(f'Unhandled exception: {error}')
+ app.config['MONITOR'].inc_message_errors(error)
+ return f'Unhandled exception: {error}', 500
+
+ return app
+```
+
+The route functions for produced messages should increase the `messages_produced`
+metric when necessary.
+Example:
+```python
+
+@forms_producer.route('/', methods=["POST"])
+@AUTH.login_required
+def handle_form():
+ """
+ Ingests a dynamic form from Eloqua and produces it to the topic
+ """
+ values = request.json
+ string_values = {key: str(value) for key, value in values.items()}
+ LOGGER.debug(f'Processing form: {values}')
+
+ current_app.config['PRODUCER'].produce(
+ topic=current_app.config['TOPIC'],
+ key=values['C_EmailAddress'],
+ value={'form_data': string_values},
+ on_delivery=produce_message_callback
+ )
+ current_app.config['MONITOR'].inc_messages_produced(1)
+
+ return jsonify(success=True)
+```
+
+
+%package -n python3-nubium-utils
+Summary: Some Kafka utility functions and patterns for the nubium project
+Provides: python-nubium-utils
+BuildRequires: python3-devel
+BuildRequires: python3-setuptools
+BuildRequires: python3-pip
+%description -n python3-nubium-utils
+# Nubium Utils
+
+## Environment Variables
+
+Nubium-Utils now has a prefix for every environment variable it uses. Every environment
+variable related to Nubium-Utils starts with `NU_`.
+
+Similarly, anything DUDE-related will start with `DUDE_`.
+
+### Important Environment Variables
+
+Nubium-Utils relies on having these environment variables defined:
+- `NU_SCHEMA_REGISTRY_URL`: schema registry url
+- `NU_SCHEMA_REGISTRY_USERNAME`: schema registry username (if applicable)
+- `NU_SCHEMA_REGISTRY_PASSWORD`: schema registry password (if applicable)
+- `NU_KAFKA_CLUSTERS_CONFIGS_JSON`: json of clusters and their respective connection settings. EX:
+`'{"cluster_0": {"url": "url", "username": "un", "password": "pw"}, "cluster_1": {"url": "url", "username": "un", "password": "pw"}}'`
+- `NU_TOPIC_CONFIGS_JSON`: json of topics and their respective cluster name (will reference NU_KAFKA_CLUSTERS) + (optional) create settings. EX:
+`'{"topic_a": {"cluster": "cluster_0", "configs": {"num_partitions": 2, "replication_factor": 2, "config": {"segment.ms": 120000}}}, "topic_b": {"cluster": cluster_1}}'`
+- `NU_HOSTNAME`: used for identifying unique consumer instances
+- `NU_APP_NAME`: used to define your consumer group
+- `NU_MP_PROJECT`: used for openshift deployment
+
+
+## Confluent-Kafka GTFO App Classes
+
+### Overview
+Nubium Utils now has an app framework for managing exactly-once processing confluent-kafka applications, named `GTFO`.
+The idea is to simplify confluent-kafka down to a kafka-streams-like interface, where consumer management is largely
+handled for you under the hood. This is particularly nice for having to manage exactly-once processing,
+which the class implements and uses by default.
+
+There are some other subclasses for some more specific use cases, namely `GtfoBatchApp` and `GtfoTableApp`, but
+the base class `GtfoApp` has been written with the intention of being easily extensible. Further details in
+terms of the recommended approaches of this will be described below.
+
+
+### The `Transaction` and `Gtfo` classes
+
+There are two essential classes you should understand before you dive in.
+
+The `Transaction` classes are actually the heart of the `GTFO` framework; they are what your business
+logic methods will actually be interacting with on a message-to-message basis. That being said, for most
+use cases, you wont even need to do much with them other than `Transaction.messages()` to get the currently consumed
+message, and `Transaction.produce()` to send a new message out.
+
+The `Gtfo`-based classes generally wrap/interface with the `Transaction` objects, generating a new one for
+every new transaction (which generally consists of consuming a message, produce desired messages, commit
+consumed message.) In general, you will really only use the `Gtfo` class to initialize everything, and it doesn't
+take much.
+
+Finally, as a general rule of thumb for both, there are not many methods to interact with on either class...
+on purpose! The functionality outlined in here will likely cover >90% of use cases.
+
+
+### Initializing/running a `GtfoApp`: basic example
+
+NOTE: there is a lot that is managed for you via environment variables, so definitely take a look at the
+"**Important Environment Variables**" section to see what you should have defined before trying to run a `GTFO` app.
+
+There are two basic components you need to initialize an app at all:
+
+- `app_function`: the business logic of your application, which should use exactly 1 argument: the `Transaction` objects
+that will be handed to it for every message.
+
+- `consume_topics_list`: the topics you are consuming. Can be comma-separated string or python list.
+
+That's it! That being said, this is if your app only consumes. To produce, you will additionally need, at minimum:
+
+- `produce_topic_schema_dict`: a dict that maps {'topic_name': _schema_obj_}, where the _schema_obj_ is a valid avro
+schema dict.
+
+Then, to run the app, do:
+
+`GtfoApp.run()`
+
+Altogether, that might look something like this:
+
+```python
+from nubium_utils.confluent_utils.transaction_utils import GtfoApp
+
+useless_schema = {
+ "name": "CoolSchema",
+ "type": "record",
+ "fields": [
+ {
+ "name": "cool_field",
+ "type": "string"
+ }
+ ]
+}
+
+
+def useless_example_func(transaction, thing_inited_at_runtime):
+ msg = transaction.messages() # can also do transaction.value(); uses the confluent-kafka method with single messages
+ cool_message_out = msg.value()
+ cool_message_out['cool_field'] = thing_inited_at_runtime # 'cool value'
+ transaction.produce({'topic':'cool_topic_out', 'key': msg.key(), 'value': cool_message_out, 'headers': msg.headers()})
+
+init_at_runtime_thing = 'cool value' # can hand off objects you need to init at runtime, like an api session object
+
+gtfo_app = GtfoApp(
+ app_function=useless_example_func,
+ consume_topics_list=['test_topic_1', 'test_topic_2'],
+ produce_topic_schema_dict={'cool_topic_out': useless_schema},
+ app_function_arglist = [init_at_runtime_thing]) # optional! Here to show functionality.
+gtfo_app.run()
+```
+
+### Using `GtfoBatchApp` (plus `BatchTransaction`)
+
+Sometimes, you want to handle multiple messages at once, such as doing a bulk upload of data to an API.
+In this case, treating each message as a separate transaction doesn't make much sense! For this, we have `GtfoBatchApp`!
+
+We still rely on much of what `Gtfo` and `Transaction` lays out, but now we can specify how many messages should
+be consumed by default for a given transaction. Additionally, you can consume more messages on demand with the
+`BulkTransaction` object via `BulkTransaction.consume()` in your `app_function`, in case you'd like to do the normal
+consume pattern most of the time, but might need that to change on demand.
+
+Biggest thing to note here: all messages for that transaction will (just like `Transaction`) be accessible via
+`BatchTransaction.messages()`. Also, all of them get committed when you finish with that instance of `app_function` just
+like a typical singleton run of `app_function`, so keep that in mind!
+
+You can tweak the default batch size via `NU_CONSUMER_DEFAULT_BATCH_CONSUME_MAX_COUNT`; defaulted to 1.
+
+Here is an example:
+
+```python
+from nubium_utils.confluent_utils.transaction_utils import GtfoBatchApp
+from cool_api import bulk_upload
+
+useless_schema = {
+ "name": "CoolSchema",
+ "type": "record",
+ "fields": [
+ {
+ "name": "cool_field",
+ "type": "string"
+ }
+ ]
+}
+
+def prep_data(msg_val):
+ # do stuff
+ pass
+
+def useless_example_func(bulk_transaction):
+ msg = transaction.messages()
+ last_msg = msg[-1]
+ # I originally set my max to 10 via the env var...but with some logic, if a message says I should do a bigger batch...
+ if last_msg.headers()['needs_bigger_bulk'] == 'true':
+ bulk_transaction.consume(consume_max_count=500) # allow me to raise that max to 500, and consume up to that (will consume up to 490 more)!
+ bulk_upload([prep_data(msg.value()) for msg in bulk_transaction.messages()]) # push all 500 messages
+
+gtfo_app = GtfoBatchApp(
+ app_function=useless_example_func,
+ consume_topic='test_topic_in',
+ produce_topic_schema_dict={'cool_topic_out': useless_schema})
+gtfo_app.run()
+```
+
+
+### Using `GtfoTableApp` (plus `TableTransaction`)
+
+One of the more complex components of GTFO is the `GtfoTableApp`. It allows you to store state based on a
+kafka topic via a localized datastore. Basically, you can store whatever you want with respect to a given kafka key,
+and later reference/compare that data against a new version of that key.
+
+There are some functional caveats/limitations that come along with this feature set:
+- The app can only consume from one topic (you can make an upstream "funnel" app as needed).
+- You must make a topic named the same as `{NU_APP_NAME}__changelog`, with the same partition count as the topic it
+would consume from.
+- You can only store/reference things based on the same key as the current message.
+- Each instance of your application needs to use the same volume.
+- Data needs to be stored as a json/dict (it is stored as a string and `json.load`-ed at runtime)
+
+With that in mind, set up is almost exactly the same as `GtfoApp`; here is an example:
+
+```python
+from nubium_utils.confluent_utils.transaction_utils import GtfoTableApp
+
+useless_schema = {
+ "name": "CoolSchema",
+ "type": "record",
+ "fields": [
+ {
+ "name": "cool_field",
+ "type": "string"
+ }
+ ]
+}
+
+
+def useless_example_func(transaction):
+ msg = transaction.messages() # can also do transaction.value(); uses the confluent-kafka method with single messages
+ cool_message_out = msg.value()
+ previous_message = transaction.read_table_entry()
+ # lets only do stuff if our previous version for the same key was "just cool"!
+ if previous_message['cool_field'] == 'just cool':
+ cool_message_out['cool_field'] = 'very cool now'
+ transaction.update_table_entry(cool_message_out) # we want to update the table with our new value. It does not do this automatically!
+ transaction.produce({'topic':'cool_topic_out', 'key': msg.key(), 'value': cool_message_out, 'headers': msg.headers()})
+
+
+gtfo_app = GtfoTableApp(
+ app_function=useless_example_func,
+ consume_topic='test_topic_in',
+ produce_topic_schema_dict={'cool_topic_out': useless_schema})
+gtfo_app.run()
+```
+
+### Extending `GtfoApp` and `Transaction`
+
+Of course, you can never cover _every_ use case! As such, each class was designed
+with extensibility in mind!
+
+Most often, what needs more customization is your consumption pattern (hence why there was already a bulk class!),
+and there is a relatively painless way to address that with minimal alterations.
+
+There is an `init` argument on `GtfoApp` named `transaction_type`; this allows you to easily pass an augmented
+version of `Transaction` with your changed consumption pattern, potentially without changing the `Transaction` class
+at all!
+
+Otherwise, hopefully things have been compartimentalized enough that you can just replace methods as needed, but
+in general, usually you'll need to mess a little with both classes, but likely mostly `Transaction`.
+
+## Monitoring
+The monitoring utils enable metrics to be surfaced from the kafka applications
+so the Prometheus server can scrape them.
+The Prometheus server can't dynamically figure out pod IPs and scrape the
+services directly, so we're using a metrics cache instead.
+
+The metrics cache is a StatefulSet with 2 services assigned to it.
+One service is a normal service, with a unique cluster IP address.
+The prometheus server scrapes this service endpoint.
+The other service doesn't have a cluster IP,
+which means that the monitoring utility can find the IP addresses of each
+of the backing pods, and send metrics to all of the pods.
+This setup gives us high-availability guarantees.
+
+The Monitoring utils are organized into two classes, `MetricsPusher` and `MetricsManager`.
+
+The `MetricsManager` is a container for all of the metrics for the app,
+and contains convenience methods for the 3 standardized metrics.
+These metrics are
+- `messages_consumed`: The number of messages consumed by the app
+- `messages_produced`: The number of messages produced by the app
+- `message_errors`: The number of exceptions caught in the app (labeled by type)
+
+The `MetricsPusher` handles pushing the applications metrics to the metrics cache.
+It determines the list of IP addresses for all of the metrics cache pods,
+and sends the current metrics values for all of the metrics.
+
+### Metric names and labels
+The names of the metrics in Prometheus are the same as their names as parameters
+- `messages_consumed`
+- `messages_produced`
+- `message_errors`
+
+Two labels exist for every metric:
+- `app`: The name of the microservice the metric came from
+- `job`: The name of the individual pod the metric came from
+The `message_errors` metric also has another label:
+- `exception`: The name of the exception that triggered the metric
+
+### Monitoring Setup Examples
+The initialization and update loop for application monitoring will differ
+from application to application based on their architecture.
+The following examples should cover the standard designs we use.
+
+#### Default Kafka Client Application
+A Kafka application that directly relies on interacting with Producer or
+Consumer clients should have it's monitoring classes set up and its
+pushing thread started in the main run function and passed to the loop, as follows:
+```python
+import os
+
+from confluent_kafka import Consumer, Producer
+from nubium_utils.metrics import MetricsManager, MetricsPusher, start_pushing_metrics
+
+def run_function():
+
+ consumer = Consumer()
+ producer = Producer()
+
+ metrics_pusher = MetricsPusher(
+ os.environ['HOSTNAME'],
+ os.environ['METRICS_SERVICE_NAME'],
+ os.environ['METRICS_SERVICE_PORT'],
+ os.environ['METRICS_POD_PORT'])
+ metrics_manager = MetricsManager(job=os.environ['HOSTNAME'], app=os.environ['APP_NAME'], metrics_pusher=metrics_pusher)
+ start_pushing_metrics(metrics_manager, int(os.environ['METRICS_PUSH_RATE']))
+
+ try:
+ while True:
+ loop_function(consumer, producer, metrics_manager=metrics_manager)
+ finally:
+ consumer.close()
+
+```
+
+The `consume_message()` function from this library expects a metrics_manager object
+as an argument, so that it can increment the `messages_consumed` metric.
+
+The application itself needs to increment the `messages_produced` metric
+needs to be incremented as necessary by the application itself
+whenever a Kafka message is produced. The convenience method on the metrics_manager
+`inc_messages_produced()` makes this easier,
+since it automatically adds the necessary labels to the metric.
+
+The application also needs to be set to increment the `message_errors` metric
+whenever an exception is caught.
+
+An example loop function might look like this:
+```python
+import os
+import logging
+
+from nubium_utils import consume_message
+from nubium_utils.custom_exceptions import NoMessageError
+
+
+def loop_function(consumer, producer, metrics_manager):
+ try:
+ message = consume_message(consumer, int(os.environ['CONSUMER_POLL_TIMEOUT']), metrics_manager)
+ outgoing_key = message.value()['email_address']
+ producer.produce(topic='outgoing_topic',key=outgoing_key,value=message.value())
+ metrics_manager.inc_messages_produced(1)
+ except NoMessageError:
+ pass
+ except KeyError as error:
+ metrics_manager.inc_message_errors(error)
+ logging.debug('Message missing email address')
+
+
+```
+
+#### Flask Kafka Application
+The setup becomes a little bit different with a Flask application.
+The metrics_manager should be accessible through the app's configuration,
+so that it can be accessed in route functions.
+
+The preferred method for error monitoring is to hook into the built in
+flask error handling loop, using the `@app.errorhandler` decorator.
+Here is an example `create_app()` function
+
+```python
+import flask
+from werkzeug.exceptions import HTTPException
+
+from .forms_producer_app import forms_producer
+from .util_blueprint import app_util_bp
+
+def create_app(config):
+ """
+ Creates app from config and needed blueprints
+ :param config: (Config) object used to configure the flask app
+ :return: (flask.App) the application object
+ """
+ app = flask.Flask(__name__)
+ app.config.from_object(config)
+
+ app.register_blueprint(forms_producer)
+ app.register_blueprint(app_util_bp)
+
+ @app.errorhandler(HTTPException)
+ def handle_exception(e):
+ """
+ Increment error gauge on metrics_manager before returning error message
+ """
+ response = e.get_response()
+ response.data = f'{e.code}:{e.name} - {e.description}'
+ app.config['MONITOR'].inc_message_errors(e)
+ return response
+
+ @app.errorhandler(Exception)
+ def unhandled_exception(error):
+ app.logger.error(f'Unhandled exception: {error}')
+ app.config['MONITOR'].inc_message_errors(error)
+ return f'Unhandled exception: {error}', 500
+
+ return app
+```
+
+The route functions for produced messages should increase the `messages_produced`
+metric when necessary.
+Example:
+```python
+
+@forms_producer.route('/', methods=["POST"])
+@AUTH.login_required
+def handle_form():
+ """
+ Ingests a dynamic form from Eloqua and produces it to the topic
+ """
+ values = request.json
+ string_values = {key: str(value) for key, value in values.items()}
+ LOGGER.debug(f'Processing form: {values}')
+
+ current_app.config['PRODUCER'].produce(
+ topic=current_app.config['TOPIC'],
+ key=values['C_EmailAddress'],
+ value={'form_data': string_values},
+ on_delivery=produce_message_callback
+ )
+ current_app.config['MONITOR'].inc_messages_produced(1)
+
+ return jsonify(success=True)
+```
+
+
+%package help
+Summary: Development documents and examples for nubium-utils
+Provides: python3-nubium-utils-doc
+%description help
+# Nubium Utils
+
+## Environment Variables
+
+Nubium-Utils now has a prefix for every environment variable it uses. Every environment
+variable related to Nubium-Utils starts with `NU_`.
+
+Similarly, anything DUDE-related will start with `DUDE_`.
+
+### Important Environment Variables
+
+Nubium-Utils relies on having these environment variables defined:
+- `NU_SCHEMA_REGISTRY_URL`: schema registry url
+- `NU_SCHEMA_REGISTRY_USERNAME`: schema registry username (if applicable)
+- `NU_SCHEMA_REGISTRY_PASSWORD`: schema registry password (if applicable)
+- `NU_KAFKA_CLUSTERS_CONFIGS_JSON`: json of clusters and their respective connection settings. EX:
+`'{"cluster_0": {"url": "url", "username": "un", "password": "pw"}, "cluster_1": {"url": "url", "username": "un", "password": "pw"}}'`
+- `NU_TOPIC_CONFIGS_JSON`: json of topics and their respective cluster name (will reference NU_KAFKA_CLUSTERS) + (optional) create settings. EX:
+`'{"topic_a": {"cluster": "cluster_0", "configs": {"num_partitions": 2, "replication_factor": 2, "config": {"segment.ms": 120000}}}, "topic_b": {"cluster": cluster_1}}'`
+- `NU_HOSTNAME`: used for identifying unique consumer instances
+- `NU_APP_NAME`: used to define your consumer group
+- `NU_MP_PROJECT`: used for openshift deployment
+
+
+## Confluent-Kafka GTFO App Classes
+
+### Overview
+Nubium Utils now has an app framework for managing exactly-once processing confluent-kafka applications, named `GTFO`.
+The idea is to simplify confluent-kafka down to a kafka-streams-like interface, where consumer management is largely
+handled for you under the hood. This is particularly nice for having to manage exactly-once processing,
+which the class implements and uses by default.
+
+There are some other subclasses for some more specific use cases, namely `GtfoBatchApp` and `GtfoTableApp`, but
+the base class `GtfoApp` has been written with the intention of being easily extensible. Further details in
+terms of the recommended approaches of this will be described below.
+
+
+### The `Transaction` and `Gtfo` classes
+
+There are two essential classes you should understand before you dive in.
+
+The `Transaction` classes are actually the heart of the `GTFO` framework; they are what your business
+logic methods will actually be interacting with on a message-to-message basis. That being said, for most
+use cases, you wont even need to do much with them other than `Transaction.messages()` to get the currently consumed
+message, and `Transaction.produce()` to send a new message out.
+
+The `Gtfo`-based classes generally wrap/interface with the `Transaction` objects, generating a new one for
+every new transaction (which generally consists of consuming a message, produce desired messages, commit
+consumed message.) In general, you will really only use the `Gtfo` class to initialize everything, and it doesn't
+take much.
+
+Finally, as a general rule of thumb for both, there are not many methods to interact with on either class...
+on purpose! The functionality outlined in here will likely cover >90% of use cases.
+
+
+### Initializing/running a `GtfoApp`: basic example
+
+NOTE: there is a lot that is managed for you via environment variables, so definitely take a look at the
+"**Important Environment Variables**" section to see what you should have defined before trying to run a `GTFO` app.
+
+There are two basic components you need to initialize an app at all:
+
+- `app_function`: the business logic of your application, which should use exactly 1 argument: the `Transaction` objects
+that will be handed to it for every message.
+
+- `consume_topics_list`: the topics you are consuming. Can be comma-separated string or python list.
+
+That's it! That being said, this is if your app only consumes. To produce, you will additionally need, at minimum:
+
+- `produce_topic_schema_dict`: a dict that maps {'topic_name': _schema_obj_}, where the _schema_obj_ is a valid avro
+schema dict.
+
+Then, to run the app, do:
+
+`GtfoApp.run()`
+
+Altogether, that might look something like this:
+
+```python
+from nubium_utils.confluent_utils.transaction_utils import GtfoApp
+
+useless_schema = {
+ "name": "CoolSchema",
+ "type": "record",
+ "fields": [
+ {
+ "name": "cool_field",
+ "type": "string"
+ }
+ ]
+}
+
+
+def useless_example_func(transaction, thing_inited_at_runtime):
+ msg = transaction.messages() # can also do transaction.value(); uses the confluent-kafka method with single messages
+ cool_message_out = msg.value()
+ cool_message_out['cool_field'] = thing_inited_at_runtime # 'cool value'
+ transaction.produce({'topic':'cool_topic_out', 'key': msg.key(), 'value': cool_message_out, 'headers': msg.headers()})
+
+init_at_runtime_thing = 'cool value' # can hand off objects you need to init at runtime, like an api session object
+
+gtfo_app = GtfoApp(
+ app_function=useless_example_func,
+ consume_topics_list=['test_topic_1', 'test_topic_2'],
+ produce_topic_schema_dict={'cool_topic_out': useless_schema},
+ app_function_arglist = [init_at_runtime_thing]) # optional! Here to show functionality.
+gtfo_app.run()
+```
+
+### Using `GtfoBatchApp` (plus `BatchTransaction`)
+
+Sometimes, you want to handle multiple messages at once, such as doing a bulk upload of data to an API.
+In this case, treating each message as a separate transaction doesn't make much sense! For this, we have `GtfoBatchApp`!
+
+We still rely on much of what `Gtfo` and `Transaction` lays out, but now we can specify how many messages should
+be consumed by default for a given transaction. Additionally, you can consume more messages on demand with the
+`BulkTransaction` object via `BulkTransaction.consume()` in your `app_function`, in case you'd like to do the normal
+consume pattern most of the time, but might need that to change on demand.
+
+Biggest thing to note here: all messages for that transaction will (just like `Transaction`) be accessible via
+`BatchTransaction.messages()`. Also, all of them get committed when you finish with that instance of `app_function` just
+like a typical singleton run of `app_function`, so keep that in mind!
+
+You can tweak the default batch size via `NU_CONSUMER_DEFAULT_BATCH_CONSUME_MAX_COUNT`; defaulted to 1.
+
+Here is an example:
+
+```python
+from nubium_utils.confluent_utils.transaction_utils import GtfoBatchApp
+from cool_api import bulk_upload
+
+useless_schema = {
+ "name": "CoolSchema",
+ "type": "record",
+ "fields": [
+ {
+ "name": "cool_field",
+ "type": "string"
+ }
+ ]
+}
+
+def prep_data(msg_val):
+ # do stuff
+ pass
+
+def useless_example_func(bulk_transaction):
+ msg = transaction.messages()
+ last_msg = msg[-1]
+ # I originally set my max to 10 via the env var...but with some logic, if a message says I should do a bigger batch...
+ if last_msg.headers()['needs_bigger_bulk'] == 'true':
+ bulk_transaction.consume(consume_max_count=500) # allow me to raise that max to 500, and consume up to that (will consume up to 490 more)!
+ bulk_upload([prep_data(msg.value()) for msg in bulk_transaction.messages()]) # push all 500 messages
+
+gtfo_app = GtfoBatchApp(
+ app_function=useless_example_func,
+ consume_topic='test_topic_in',
+ produce_topic_schema_dict={'cool_topic_out': useless_schema})
+gtfo_app.run()
+```
+
+
+### Using `GtfoTableApp` (plus `TableTransaction`)
+
+One of the more complex components of GTFO is the `GtfoTableApp`. It allows you to store state based on a
+kafka topic via a localized datastore. Basically, you can store whatever you want with respect to a given kafka key,
+and later reference/compare that data against a new version of that key.
+
+There are some functional caveats/limitations that come along with this feature set:
+- The app can only consume from one topic (you can make an upstream "funnel" app as needed).
+- You must make a topic named the same as `{NU_APP_NAME}__changelog`, with the same partition count as the topic it
+would consume from.
+- You can only store/reference things based on the same key as the current message.
+- Each instance of your application needs to use the same volume.
+- Data needs to be stored as a json/dict (it is stored as a string and `json.load`-ed at runtime)
+
+With that in mind, set up is almost exactly the same as `GtfoApp`; here is an example:
+
+```python
+from nubium_utils.confluent_utils.transaction_utils import GtfoTableApp
+
+useless_schema = {
+ "name": "CoolSchema",
+ "type": "record",
+ "fields": [
+ {
+ "name": "cool_field",
+ "type": "string"
+ }
+ ]
+}
+
+
+def useless_example_func(transaction):
+ msg = transaction.messages() # can also do transaction.value(); uses the confluent-kafka method with single messages
+ cool_message_out = msg.value()
+ previous_message = transaction.read_table_entry()
+ # lets only do stuff if our previous version for the same key was "just cool"!
+ if previous_message['cool_field'] == 'just cool':
+ cool_message_out['cool_field'] = 'very cool now'
+ transaction.update_table_entry(cool_message_out) # we want to update the table with our new value. It does not do this automatically!
+ transaction.produce({'topic':'cool_topic_out', 'key': msg.key(), 'value': cool_message_out, 'headers': msg.headers()})
+
+
+gtfo_app = GtfoTableApp(
+ app_function=useless_example_func,
+ consume_topic='test_topic_in',
+ produce_topic_schema_dict={'cool_topic_out': useless_schema})
+gtfo_app.run()
+```
+
+### Extending `GtfoApp` and `Transaction`
+
+Of course, you can never cover _every_ use case! As such, each class was designed
+with extensibility in mind!
+
+Most often, what needs more customization is your consumption pattern (hence why there was already a bulk class!),
+and there is a relatively painless way to address that with minimal alterations.
+
+There is an `init` argument on `GtfoApp` named `transaction_type`; this allows you to easily pass an augmented
+version of `Transaction` with your changed consumption pattern, potentially without changing the `Transaction` class
+at all!
+
+Otherwise, hopefully things have been compartimentalized enough that you can just replace methods as needed, but
+in general, usually you'll need to mess a little with both classes, but likely mostly `Transaction`.
+
+## Monitoring
+The monitoring utils enable metrics to be surfaced from the kafka applications
+so the Prometheus server can scrape them.
+The Prometheus server can't dynamically figure out pod IPs and scrape the
+services directly, so we're using a metrics cache instead.
+
+The metrics cache is a StatefulSet with 2 services assigned to it.
+One service is a normal service, with a unique cluster IP address.
+The prometheus server scrapes this service endpoint.
+The other service doesn't have a cluster IP,
+which means that the monitoring utility can find the IP addresses of each
+of the backing pods, and send metrics to all of the pods.
+This setup gives us high-availability guarantees.
+
+The Monitoring utils are organized into two classes, `MetricsPusher` and `MetricsManager`.
+
+The `MetricsManager` is a container for all of the metrics for the app,
+and contains convenience methods for the 3 standardized metrics.
+These metrics are
+- `messages_consumed`: The number of messages consumed by the app
+- `messages_produced`: The number of messages produced by the app
+- `message_errors`: The number of exceptions caught in the app (labeled by type)
+
+The `MetricsPusher` handles pushing the applications metrics to the metrics cache.
+It determines the list of IP addresses for all of the metrics cache pods,
+and sends the current metrics values for all of the metrics.
+
+### Metric names and labels
+The names of the metrics in Prometheus are the same as their names as parameters
+- `messages_consumed`
+- `messages_produced`
+- `message_errors`
+
+Two labels exist for every metric:
+- `app`: The name of the microservice the metric came from
+- `job`: The name of the individual pod the metric came from
+The `message_errors` metric also has another label:
+- `exception`: The name of the exception that triggered the metric
+
+### Monitoring Setup Examples
+The initialization and update loop for application monitoring will differ
+from application to application based on their architecture.
+The following examples should cover the standard designs we use.
+
+#### Default Kafka Client Application
+A Kafka application that directly relies on interacting with Producer or
+Consumer clients should have it's monitoring classes set up and its
+pushing thread started in the main run function and passed to the loop, as follows:
+```python
+import os
+
+from confluent_kafka import Consumer, Producer
+from nubium_utils.metrics import MetricsManager, MetricsPusher, start_pushing_metrics
+
+def run_function():
+
+ consumer = Consumer()
+ producer = Producer()
+
+ metrics_pusher = MetricsPusher(
+ os.environ['HOSTNAME'],
+ os.environ['METRICS_SERVICE_NAME'],
+ os.environ['METRICS_SERVICE_PORT'],
+ os.environ['METRICS_POD_PORT'])
+ metrics_manager = MetricsManager(job=os.environ['HOSTNAME'], app=os.environ['APP_NAME'], metrics_pusher=metrics_pusher)
+ start_pushing_metrics(metrics_manager, int(os.environ['METRICS_PUSH_RATE']))
+
+ try:
+ while True:
+ loop_function(consumer, producer, metrics_manager=metrics_manager)
+ finally:
+ consumer.close()
+
+```
+
+The `consume_message()` function from this library expects a metrics_manager object
+as an argument, so that it can increment the `messages_consumed` metric.
+
+The application itself needs to increment the `messages_produced` metric
+needs to be incremented as necessary by the application itself
+whenever a Kafka message is produced. The convenience method on the metrics_manager
+`inc_messages_produced()` makes this easier,
+since it automatically adds the necessary labels to the metric.
+
+The application also needs to be set to increment the `message_errors` metric
+whenever an exception is caught.
+
+An example loop function might look like this:
+```python
+import os
+import logging
+
+from nubium_utils import consume_message
+from nubium_utils.custom_exceptions import NoMessageError
+
+
+def loop_function(consumer, producer, metrics_manager):
+ try:
+ message = consume_message(consumer, int(os.environ['CONSUMER_POLL_TIMEOUT']), metrics_manager)
+ outgoing_key = message.value()['email_address']
+ producer.produce(topic='outgoing_topic',key=outgoing_key,value=message.value())
+ metrics_manager.inc_messages_produced(1)
+ except NoMessageError:
+ pass
+ except KeyError as error:
+ metrics_manager.inc_message_errors(error)
+ logging.debug('Message missing email address')
+
+
+```
+
+#### Flask Kafka Application
+The setup becomes a little bit different with a Flask application.
+The metrics_manager should be accessible through the app's configuration,
+so that it can be accessed in route functions.
+
+The preferred method for error monitoring is to hook into the built in
+flask error handling loop, using the `@app.errorhandler` decorator.
+Here is an example `create_app()` function
+
+```python
+import flask
+from werkzeug.exceptions import HTTPException
+
+from .forms_producer_app import forms_producer
+from .util_blueprint import app_util_bp
+
+def create_app(config):
+ """
+ Creates app from config and needed blueprints
+ :param config: (Config) object used to configure the flask app
+ :return: (flask.App) the application object
+ """
+ app = flask.Flask(__name__)
+ app.config.from_object(config)
+
+ app.register_blueprint(forms_producer)
+ app.register_blueprint(app_util_bp)
+
+ @app.errorhandler(HTTPException)
+ def handle_exception(e):
+ """
+ Increment error gauge on metrics_manager before returning error message
+ """
+ response = e.get_response()
+ response.data = f'{e.code}:{e.name} - {e.description}'
+ app.config['MONITOR'].inc_message_errors(e)
+ return response
+
+ @app.errorhandler(Exception)
+ def unhandled_exception(error):
+ app.logger.error(f'Unhandled exception: {error}')
+ app.config['MONITOR'].inc_message_errors(error)
+ return f'Unhandled exception: {error}', 500
+
+ return app
+```
+
+The route functions for produced messages should increase the `messages_produced`
+metric when necessary.
+Example:
+```python
+
+@forms_producer.route('/', methods=["POST"])
+@AUTH.login_required
+def handle_form():
+ """
+ Ingests a dynamic form from Eloqua and produces it to the topic
+ """
+ values = request.json
+ string_values = {key: str(value) for key, value in values.items()}
+ LOGGER.debug(f'Processing form: {values}')
+
+ current_app.config['PRODUCER'].produce(
+ topic=current_app.config['TOPIC'],
+ key=values['C_EmailAddress'],
+ value={'form_data': string_values},
+ on_delivery=produce_message_callback
+ )
+ current_app.config['MONITOR'].inc_messages_produced(1)
+
+ return jsonify(success=True)
+```
+
+
+%prep
+%autosetup -n nubium-utils-4.2.0
+
+%build
+%py3_build
+
+%install
+%py3_install
+install -d -m755 %{buildroot}/%{_pkgdocdir}
+if [ -d doc ]; then cp -arf doc %{buildroot}/%{_pkgdocdir}; fi
+if [ -d docs ]; then cp -arf docs %{buildroot}/%{_pkgdocdir}; fi
+if [ -d example ]; then cp -arf example %{buildroot}/%{_pkgdocdir}; fi
+if [ -d examples ]; then cp -arf examples %{buildroot}/%{_pkgdocdir}; fi
+pushd %{buildroot}
+if [ -d usr/lib ]; then
+ find usr/lib -type f -printf "/%h/%f\n" >> filelist.lst
+fi
+if [ -d usr/lib64 ]; then
+ find usr/lib64 -type f -printf "/%h/%f\n" >> filelist.lst
+fi
+if [ -d usr/bin ]; then
+ find usr/bin -type f -printf "/%h/%f\n" >> filelist.lst
+fi
+if [ -d usr/sbin ]; then
+ find usr/sbin -type f -printf "/%h/%f\n" >> filelist.lst
+fi
+touch doclist.lst
+if [ -d usr/share/man ]; then
+ find usr/share/man -type f -printf "/%h/%f.gz\n" >> doclist.lst
+fi
+popd
+mv %{buildroot}/filelist.lst .
+mv %{buildroot}/doclist.lst .
+
+%files -n python3-nubium-utils -f filelist.lst
+%dir %{python3_sitelib}/*
+
+%files help -f doclist.lst
+%{_docdir}/*
+
+%changelog
+* Fri May 05 2023 Python_Bot <Python_Bot@openeuler.org> - 4.2.0-1
+- Package Spec generated