diff options
Diffstat (limited to 'python-nubium-utils.spec')
| -rw-r--r-- | python-nubium-utils.spec | 1328 |
1 files changed, 1328 insertions, 0 deletions
diff --git a/python-nubium-utils.spec b/python-nubium-utils.spec new file mode 100644 index 0000000..8854cc6 --- /dev/null +++ b/python-nubium-utils.spec @@ -0,0 +1,1328 @@ +%global _empty_manifest_terminate_build 0 +Name: python-nubium-utils +Version: 4.2.0 +Release: 1 +Summary: Some Kafka utility functions and patterns for the nubium project +License: MIT License +URL: https://gitlab.corp.redhat.com/mkt-ops-de/nubium-utils.git +Source0: https://mirrors.nju.edu.cn/pypi/web/packages/24/90/9c0cef4d47caf1d9bdc3e53c16edd2322592941cc79baacf73948110c0e9/nubium-utils-4.2.0.tar.gz +BuildArch: noarch + +Requires: python3-fluvii +Requires: python3-aiosfstream +Requires: python3-dictdiffer +Requires: python3-psutil +Requires: python3-pydantic +Requires: python3-box +Requires: python3-dateutil +Requires: python3-dotenv +Requires: python3-eloqua-wrapper +Requires: python3-pytz +Requires: python3-requests +Requires: python3-simple-salesforce +Requires: python3-fluvii +Requires: python3-aiosfstream +Requires: python3-dictdiffer +Requires: python3-psutil +Requires: python3-pydantic +Requires: python3-box +Requires: python3-dateutil +Requires: python3-dotenv +Requires: python3-eloqua-wrapper +Requires: python3-pytz +Requires: python3-requests +Requires: python3-simple-salesforce +Requires: python3-pip-tools +Requires: python3-pytest +Requires: python3-pytest-cov +Requires: python3-time-machine +Requires: python3-twine + +%description +# Nubium Utils + +## Environment Variables + +Nubium-Utils now has a prefix for every environment variable it uses. Every environment +variable related to Nubium-Utils starts with `NU_`. + +Similarly, anything DUDE-related will start with `DUDE_`. + +### Important Environment Variables + +Nubium-Utils relies on having these environment variables defined: +- `NU_SCHEMA_REGISTRY_URL`: schema registry url +- `NU_SCHEMA_REGISTRY_USERNAME`: schema registry username (if applicable) +- `NU_SCHEMA_REGISTRY_PASSWORD`: schema registry password (if applicable) +- `NU_KAFKA_CLUSTERS_CONFIGS_JSON`: json of clusters and their respective connection settings. EX: +`'{"cluster_0": {"url": "url", "username": "un", "password": "pw"}, "cluster_1": {"url": "url", "username": "un", "password": "pw"}}'` +- `NU_TOPIC_CONFIGS_JSON`: json of topics and their respective cluster name (will reference NU_KAFKA_CLUSTERS) + (optional) create settings. EX: +`'{"topic_a": {"cluster": "cluster_0", "configs": {"num_partitions": 2, "replication_factor": 2, "config": {"segment.ms": 120000}}}, "topic_b": {"cluster": cluster_1}}'` +- `NU_HOSTNAME`: used for identifying unique consumer instances +- `NU_APP_NAME`: used to define your consumer group +- `NU_MP_PROJECT`: used for openshift deployment + + +## Confluent-Kafka GTFO App Classes + +### Overview +Nubium Utils now has an app framework for managing exactly-once processing confluent-kafka applications, named `GTFO`. +The idea is to simplify confluent-kafka down to a kafka-streams-like interface, where consumer management is largely +handled for you under the hood. This is particularly nice for having to manage exactly-once processing, +which the class implements and uses by default. + +There are some other subclasses for some more specific use cases, namely `GtfoBatchApp` and `GtfoTableApp`, but +the base class `GtfoApp` has been written with the intention of being easily extensible. Further details in +terms of the recommended approaches of this will be described below. + + +### The `Transaction` and `Gtfo` classes + +There are two essential classes you should understand before you dive in. + +The `Transaction` classes are actually the heart of the `GTFO` framework; they are what your business +logic methods will actually be interacting with on a message-to-message basis. That being said, for most +use cases, you wont even need to do much with them other than `Transaction.messages()` to get the currently consumed +message, and `Transaction.produce()` to send a new message out. + +The `Gtfo`-based classes generally wrap/interface with the `Transaction` objects, generating a new one for +every new transaction (which generally consists of consuming a message, produce desired messages, commit +consumed message.) In general, you will really only use the `Gtfo` class to initialize everything, and it doesn't +take much. + +Finally, as a general rule of thumb for both, there are not many methods to interact with on either class... +on purpose! The functionality outlined in here will likely cover >90% of use cases. + + +### Initializing/running a `GtfoApp`: basic example + +NOTE: there is a lot that is managed for you via environment variables, so definitely take a look at the +"**Important Environment Variables**" section to see what you should have defined before trying to run a `GTFO` app. + +There are two basic components you need to initialize an app at all: + +- `app_function`: the business logic of your application, which should use exactly 1 argument: the `Transaction` objects +that will be handed to it for every message. + +- `consume_topics_list`: the topics you are consuming. Can be comma-separated string or python list. + +That's it! That being said, this is if your app only consumes. To produce, you will additionally need, at minimum: + +- `produce_topic_schema_dict`: a dict that maps {'topic_name': _schema_obj_}, where the _schema_obj_ is a valid avro +schema dict. + +Then, to run the app, do: + +`GtfoApp.run()` + +Altogether, that might look something like this: + +```python +from nubium_utils.confluent_utils.transaction_utils import GtfoApp + +useless_schema = { + "name": "CoolSchema", + "type": "record", + "fields": [ + { + "name": "cool_field", + "type": "string" + } + ] +} + + +def useless_example_func(transaction, thing_inited_at_runtime): + msg = transaction.messages() # can also do transaction.value(); uses the confluent-kafka method with single messages + cool_message_out = msg.value() + cool_message_out['cool_field'] = thing_inited_at_runtime # 'cool value' + transaction.produce({'topic':'cool_topic_out', 'key': msg.key(), 'value': cool_message_out, 'headers': msg.headers()}) + +init_at_runtime_thing = 'cool value' # can hand off objects you need to init at runtime, like an api session object + +gtfo_app = GtfoApp( + app_function=useless_example_func, + consume_topics_list=['test_topic_1', 'test_topic_2'], + produce_topic_schema_dict={'cool_topic_out': useless_schema}, + app_function_arglist = [init_at_runtime_thing]) # optional! Here to show functionality. +gtfo_app.run() +``` + +### Using `GtfoBatchApp` (plus `BatchTransaction`) + +Sometimes, you want to handle multiple messages at once, such as doing a bulk upload of data to an API. +In this case, treating each message as a separate transaction doesn't make much sense! For this, we have `GtfoBatchApp`! + +We still rely on much of what `Gtfo` and `Transaction` lays out, but now we can specify how many messages should +be consumed by default for a given transaction. Additionally, you can consume more messages on demand with the +`BulkTransaction` object via `BulkTransaction.consume()` in your `app_function`, in case you'd like to do the normal +consume pattern most of the time, but might need that to change on demand. + +Biggest thing to note here: all messages for that transaction will (just like `Transaction`) be accessible via +`BatchTransaction.messages()`. Also, all of them get committed when you finish with that instance of `app_function` just +like a typical singleton run of `app_function`, so keep that in mind! + +You can tweak the default batch size via `NU_CONSUMER_DEFAULT_BATCH_CONSUME_MAX_COUNT`; defaulted to 1. + +Here is an example: + +```python +from nubium_utils.confluent_utils.transaction_utils import GtfoBatchApp +from cool_api import bulk_upload + +useless_schema = { + "name": "CoolSchema", + "type": "record", + "fields": [ + { + "name": "cool_field", + "type": "string" + } + ] +} + +def prep_data(msg_val): + # do stuff + pass + +def useless_example_func(bulk_transaction): + msg = transaction.messages() + last_msg = msg[-1] + # I originally set my max to 10 via the env var...but with some logic, if a message says I should do a bigger batch... + if last_msg.headers()['needs_bigger_bulk'] == 'true': + bulk_transaction.consume(consume_max_count=500) # allow me to raise that max to 500, and consume up to that (will consume up to 490 more)! + bulk_upload([prep_data(msg.value()) for msg in bulk_transaction.messages()]) # push all 500 messages + +gtfo_app = GtfoBatchApp( + app_function=useless_example_func, + consume_topic='test_topic_in', + produce_topic_schema_dict={'cool_topic_out': useless_schema}) +gtfo_app.run() +``` + + +### Using `GtfoTableApp` (plus `TableTransaction`) + +One of the more complex components of GTFO is the `GtfoTableApp`. It allows you to store state based on a +kafka topic via a localized datastore. Basically, you can store whatever you want with respect to a given kafka key, +and later reference/compare that data against a new version of that key. + +There are some functional caveats/limitations that come along with this feature set: +- The app can only consume from one topic (you can make an upstream "funnel" app as needed). +- You must make a topic named the same as `{NU_APP_NAME}__changelog`, with the same partition count as the topic it +would consume from. +- You can only store/reference things based on the same key as the current message. +- Each instance of your application needs to use the same volume. +- Data needs to be stored as a json/dict (it is stored as a string and `json.load`-ed at runtime) + +With that in mind, set up is almost exactly the same as `GtfoApp`; here is an example: + +```python +from nubium_utils.confluent_utils.transaction_utils import GtfoTableApp + +useless_schema = { + "name": "CoolSchema", + "type": "record", + "fields": [ + { + "name": "cool_field", + "type": "string" + } + ] +} + + +def useless_example_func(transaction): + msg = transaction.messages() # can also do transaction.value(); uses the confluent-kafka method with single messages + cool_message_out = msg.value() + previous_message = transaction.read_table_entry() + # lets only do stuff if our previous version for the same key was "just cool"! + if previous_message['cool_field'] == 'just cool': + cool_message_out['cool_field'] = 'very cool now' + transaction.update_table_entry(cool_message_out) # we want to update the table with our new value. It does not do this automatically! + transaction.produce({'topic':'cool_topic_out', 'key': msg.key(), 'value': cool_message_out, 'headers': msg.headers()}) + + +gtfo_app = GtfoTableApp( + app_function=useless_example_func, + consume_topic='test_topic_in', + produce_topic_schema_dict={'cool_topic_out': useless_schema}) +gtfo_app.run() +``` + +### Extending `GtfoApp` and `Transaction` + +Of course, you can never cover _every_ use case! As such, each class was designed +with extensibility in mind! + +Most often, what needs more customization is your consumption pattern (hence why there was already a bulk class!), +and there is a relatively painless way to address that with minimal alterations. + +There is an `init` argument on `GtfoApp` named `transaction_type`; this allows you to easily pass an augmented +version of `Transaction` with your changed consumption pattern, potentially without changing the `Transaction` class +at all! + +Otherwise, hopefully things have been compartimentalized enough that you can just replace methods as needed, but +in general, usually you'll need to mess a little with both classes, but likely mostly `Transaction`. + +## Monitoring +The monitoring utils enable metrics to be surfaced from the kafka applications +so the Prometheus server can scrape them. +The Prometheus server can't dynamically figure out pod IPs and scrape the +services directly, so we're using a metrics cache instead. + +The metrics cache is a StatefulSet with 2 services assigned to it. +One service is a normal service, with a unique cluster IP address. +The prometheus server scrapes this service endpoint. +The other service doesn't have a cluster IP, +which means that the monitoring utility can find the IP addresses of each +of the backing pods, and send metrics to all of the pods. +This setup gives us high-availability guarantees. + +The Monitoring utils are organized into two classes, `MetricsPusher` and `MetricsManager`. + +The `MetricsManager` is a container for all of the metrics for the app, +and contains convenience methods for the 3 standardized metrics. +These metrics are +- `messages_consumed`: The number of messages consumed by the app +- `messages_produced`: The number of messages produced by the app +- `message_errors`: The number of exceptions caught in the app (labeled by type) + +The `MetricsPusher` handles pushing the applications metrics to the metrics cache. +It determines the list of IP addresses for all of the metrics cache pods, +and sends the current metrics values for all of the metrics. + +### Metric names and labels +The names of the metrics in Prometheus are the same as their names as parameters +- `messages_consumed` +- `messages_produced` +- `message_errors` + +Two labels exist for every metric: +- `app`: The name of the microservice the metric came from +- `job`: The name of the individual pod the metric came from +The `message_errors` metric also has another label: +- `exception`: The name of the exception that triggered the metric + +### Monitoring Setup Examples +The initialization and update loop for application monitoring will differ +from application to application based on their architecture. +The following examples should cover the standard designs we use. + +#### Default Kafka Client Application +A Kafka application that directly relies on interacting with Producer or +Consumer clients should have it's monitoring classes set up and its +pushing thread started in the main run function and passed to the loop, as follows: +```python +import os + +from confluent_kafka import Consumer, Producer +from nubium_utils.metrics import MetricsManager, MetricsPusher, start_pushing_metrics + +def run_function(): + + consumer = Consumer() + producer = Producer() + + metrics_pusher = MetricsPusher( + os.environ['HOSTNAME'], + os.environ['METRICS_SERVICE_NAME'], + os.environ['METRICS_SERVICE_PORT'], + os.environ['METRICS_POD_PORT']) + metrics_manager = MetricsManager(job=os.environ['HOSTNAME'], app=os.environ['APP_NAME'], metrics_pusher=metrics_pusher) + start_pushing_metrics(metrics_manager, int(os.environ['METRICS_PUSH_RATE'])) + + try: + while True: + loop_function(consumer, producer, metrics_manager=metrics_manager) + finally: + consumer.close() + +``` + +The `consume_message()` function from this library expects a metrics_manager object +as an argument, so that it can increment the `messages_consumed` metric. + +The application itself needs to increment the `messages_produced` metric +needs to be incremented as necessary by the application itself +whenever a Kafka message is produced. The convenience method on the metrics_manager +`inc_messages_produced()` makes this easier, +since it automatically adds the necessary labels to the metric. + +The application also needs to be set to increment the `message_errors` metric +whenever an exception is caught. + +An example loop function might look like this: +```python +import os +import logging + +from nubium_utils import consume_message +from nubium_utils.custom_exceptions import NoMessageError + + +def loop_function(consumer, producer, metrics_manager): + try: + message = consume_message(consumer, int(os.environ['CONSUMER_POLL_TIMEOUT']), metrics_manager) + outgoing_key = message.value()['email_address'] + producer.produce(topic='outgoing_topic',key=outgoing_key,value=message.value()) + metrics_manager.inc_messages_produced(1) + except NoMessageError: + pass + except KeyError as error: + metrics_manager.inc_message_errors(error) + logging.debug('Message missing email address') + + +``` + +#### Flask Kafka Application +The setup becomes a little bit different with a Flask application. +The metrics_manager should be accessible through the app's configuration, +so that it can be accessed in route functions. + +The preferred method for error monitoring is to hook into the built in +flask error handling loop, using the `@app.errorhandler` decorator. +Here is an example `create_app()` function + +```python +import flask +from werkzeug.exceptions import HTTPException + +from .forms_producer_app import forms_producer +from .util_blueprint import app_util_bp + +def create_app(config): + """ + Creates app from config and needed blueprints + :param config: (Config) object used to configure the flask app + :return: (flask.App) the application object + """ + app = flask.Flask(__name__) + app.config.from_object(config) + + app.register_blueprint(forms_producer) + app.register_blueprint(app_util_bp) + + @app.errorhandler(HTTPException) + def handle_exception(e): + """ + Increment error gauge on metrics_manager before returning error message + """ + response = e.get_response() + response.data = f'{e.code}:{e.name} - {e.description}' + app.config['MONITOR'].inc_message_errors(e) + return response + + @app.errorhandler(Exception) + def unhandled_exception(error): + app.logger.error(f'Unhandled exception: {error}') + app.config['MONITOR'].inc_message_errors(error) + return f'Unhandled exception: {error}', 500 + + return app +``` + +The route functions for produced messages should increase the `messages_produced` +metric when necessary. +Example: +```python + +@forms_producer.route('/', methods=["POST"]) +@AUTH.login_required +def handle_form(): + """ + Ingests a dynamic form from Eloqua and produces it to the topic + """ + values = request.json + string_values = {key: str(value) for key, value in values.items()} + LOGGER.debug(f'Processing form: {values}') + + current_app.config['PRODUCER'].produce( + topic=current_app.config['TOPIC'], + key=values['C_EmailAddress'], + value={'form_data': string_values}, + on_delivery=produce_message_callback + ) + current_app.config['MONITOR'].inc_messages_produced(1) + + return jsonify(success=True) +``` + + +%package -n python3-nubium-utils +Summary: Some Kafka utility functions and patterns for the nubium project +Provides: python-nubium-utils +BuildRequires: python3-devel +BuildRequires: python3-setuptools +BuildRequires: python3-pip +%description -n python3-nubium-utils +# Nubium Utils + +## Environment Variables + +Nubium-Utils now has a prefix for every environment variable it uses. Every environment +variable related to Nubium-Utils starts with `NU_`. + +Similarly, anything DUDE-related will start with `DUDE_`. + +### Important Environment Variables + +Nubium-Utils relies on having these environment variables defined: +- `NU_SCHEMA_REGISTRY_URL`: schema registry url +- `NU_SCHEMA_REGISTRY_USERNAME`: schema registry username (if applicable) +- `NU_SCHEMA_REGISTRY_PASSWORD`: schema registry password (if applicable) +- `NU_KAFKA_CLUSTERS_CONFIGS_JSON`: json of clusters and their respective connection settings. EX: +`'{"cluster_0": {"url": "url", "username": "un", "password": "pw"}, "cluster_1": {"url": "url", "username": "un", "password": "pw"}}'` +- `NU_TOPIC_CONFIGS_JSON`: json of topics and their respective cluster name (will reference NU_KAFKA_CLUSTERS) + (optional) create settings. EX: +`'{"topic_a": {"cluster": "cluster_0", "configs": {"num_partitions": 2, "replication_factor": 2, "config": {"segment.ms": 120000}}}, "topic_b": {"cluster": cluster_1}}'` +- `NU_HOSTNAME`: used for identifying unique consumer instances +- `NU_APP_NAME`: used to define your consumer group +- `NU_MP_PROJECT`: used for openshift deployment + + +## Confluent-Kafka GTFO App Classes + +### Overview +Nubium Utils now has an app framework for managing exactly-once processing confluent-kafka applications, named `GTFO`. +The idea is to simplify confluent-kafka down to a kafka-streams-like interface, where consumer management is largely +handled for you under the hood. This is particularly nice for having to manage exactly-once processing, +which the class implements and uses by default. + +There are some other subclasses for some more specific use cases, namely `GtfoBatchApp` and `GtfoTableApp`, but +the base class `GtfoApp` has been written with the intention of being easily extensible. Further details in +terms of the recommended approaches of this will be described below. + + +### The `Transaction` and `Gtfo` classes + +There are two essential classes you should understand before you dive in. + +The `Transaction` classes are actually the heart of the `GTFO` framework; they are what your business +logic methods will actually be interacting with on a message-to-message basis. That being said, for most +use cases, you wont even need to do much with them other than `Transaction.messages()` to get the currently consumed +message, and `Transaction.produce()` to send a new message out. + +The `Gtfo`-based classes generally wrap/interface with the `Transaction` objects, generating a new one for +every new transaction (which generally consists of consuming a message, produce desired messages, commit +consumed message.) In general, you will really only use the `Gtfo` class to initialize everything, and it doesn't +take much. + +Finally, as a general rule of thumb for both, there are not many methods to interact with on either class... +on purpose! The functionality outlined in here will likely cover >90% of use cases. + + +### Initializing/running a `GtfoApp`: basic example + +NOTE: there is a lot that is managed for you via environment variables, so definitely take a look at the +"**Important Environment Variables**" section to see what you should have defined before trying to run a `GTFO` app. + +There are two basic components you need to initialize an app at all: + +- `app_function`: the business logic of your application, which should use exactly 1 argument: the `Transaction` objects +that will be handed to it for every message. + +- `consume_topics_list`: the topics you are consuming. Can be comma-separated string or python list. + +That's it! That being said, this is if your app only consumes. To produce, you will additionally need, at minimum: + +- `produce_topic_schema_dict`: a dict that maps {'topic_name': _schema_obj_}, where the _schema_obj_ is a valid avro +schema dict. + +Then, to run the app, do: + +`GtfoApp.run()` + +Altogether, that might look something like this: + +```python +from nubium_utils.confluent_utils.transaction_utils import GtfoApp + +useless_schema = { + "name": "CoolSchema", + "type": "record", + "fields": [ + { + "name": "cool_field", + "type": "string" + } + ] +} + + +def useless_example_func(transaction, thing_inited_at_runtime): + msg = transaction.messages() # can also do transaction.value(); uses the confluent-kafka method with single messages + cool_message_out = msg.value() + cool_message_out['cool_field'] = thing_inited_at_runtime # 'cool value' + transaction.produce({'topic':'cool_topic_out', 'key': msg.key(), 'value': cool_message_out, 'headers': msg.headers()}) + +init_at_runtime_thing = 'cool value' # can hand off objects you need to init at runtime, like an api session object + +gtfo_app = GtfoApp( + app_function=useless_example_func, + consume_topics_list=['test_topic_1', 'test_topic_2'], + produce_topic_schema_dict={'cool_topic_out': useless_schema}, + app_function_arglist = [init_at_runtime_thing]) # optional! Here to show functionality. +gtfo_app.run() +``` + +### Using `GtfoBatchApp` (plus `BatchTransaction`) + +Sometimes, you want to handle multiple messages at once, such as doing a bulk upload of data to an API. +In this case, treating each message as a separate transaction doesn't make much sense! For this, we have `GtfoBatchApp`! + +We still rely on much of what `Gtfo` and `Transaction` lays out, but now we can specify how many messages should +be consumed by default for a given transaction. Additionally, you can consume more messages on demand with the +`BulkTransaction` object via `BulkTransaction.consume()` in your `app_function`, in case you'd like to do the normal +consume pattern most of the time, but might need that to change on demand. + +Biggest thing to note here: all messages for that transaction will (just like `Transaction`) be accessible via +`BatchTransaction.messages()`. Also, all of them get committed when you finish with that instance of `app_function` just +like a typical singleton run of `app_function`, so keep that in mind! + +You can tweak the default batch size via `NU_CONSUMER_DEFAULT_BATCH_CONSUME_MAX_COUNT`; defaulted to 1. + +Here is an example: + +```python +from nubium_utils.confluent_utils.transaction_utils import GtfoBatchApp +from cool_api import bulk_upload + +useless_schema = { + "name": "CoolSchema", + "type": "record", + "fields": [ + { + "name": "cool_field", + "type": "string" + } + ] +} + +def prep_data(msg_val): + # do stuff + pass + +def useless_example_func(bulk_transaction): + msg = transaction.messages() + last_msg = msg[-1] + # I originally set my max to 10 via the env var...but with some logic, if a message says I should do a bigger batch... + if last_msg.headers()['needs_bigger_bulk'] == 'true': + bulk_transaction.consume(consume_max_count=500) # allow me to raise that max to 500, and consume up to that (will consume up to 490 more)! + bulk_upload([prep_data(msg.value()) for msg in bulk_transaction.messages()]) # push all 500 messages + +gtfo_app = GtfoBatchApp( + app_function=useless_example_func, + consume_topic='test_topic_in', + produce_topic_schema_dict={'cool_topic_out': useless_schema}) +gtfo_app.run() +``` + + +### Using `GtfoTableApp` (plus `TableTransaction`) + +One of the more complex components of GTFO is the `GtfoTableApp`. It allows you to store state based on a +kafka topic via a localized datastore. Basically, you can store whatever you want with respect to a given kafka key, +and later reference/compare that data against a new version of that key. + +There are some functional caveats/limitations that come along with this feature set: +- The app can only consume from one topic (you can make an upstream "funnel" app as needed). +- You must make a topic named the same as `{NU_APP_NAME}__changelog`, with the same partition count as the topic it +would consume from. +- You can only store/reference things based on the same key as the current message. +- Each instance of your application needs to use the same volume. +- Data needs to be stored as a json/dict (it is stored as a string and `json.load`-ed at runtime) + +With that in mind, set up is almost exactly the same as `GtfoApp`; here is an example: + +```python +from nubium_utils.confluent_utils.transaction_utils import GtfoTableApp + +useless_schema = { + "name": "CoolSchema", + "type": "record", + "fields": [ + { + "name": "cool_field", + "type": "string" + } + ] +} + + +def useless_example_func(transaction): + msg = transaction.messages() # can also do transaction.value(); uses the confluent-kafka method with single messages + cool_message_out = msg.value() + previous_message = transaction.read_table_entry() + # lets only do stuff if our previous version for the same key was "just cool"! + if previous_message['cool_field'] == 'just cool': + cool_message_out['cool_field'] = 'very cool now' + transaction.update_table_entry(cool_message_out) # we want to update the table with our new value. It does not do this automatically! + transaction.produce({'topic':'cool_topic_out', 'key': msg.key(), 'value': cool_message_out, 'headers': msg.headers()}) + + +gtfo_app = GtfoTableApp( + app_function=useless_example_func, + consume_topic='test_topic_in', + produce_topic_schema_dict={'cool_topic_out': useless_schema}) +gtfo_app.run() +``` + +### Extending `GtfoApp` and `Transaction` + +Of course, you can never cover _every_ use case! As such, each class was designed +with extensibility in mind! + +Most often, what needs more customization is your consumption pattern (hence why there was already a bulk class!), +and there is a relatively painless way to address that with minimal alterations. + +There is an `init` argument on `GtfoApp` named `transaction_type`; this allows you to easily pass an augmented +version of `Transaction` with your changed consumption pattern, potentially without changing the `Transaction` class +at all! + +Otherwise, hopefully things have been compartimentalized enough that you can just replace methods as needed, but +in general, usually you'll need to mess a little with both classes, but likely mostly `Transaction`. + +## Monitoring +The monitoring utils enable metrics to be surfaced from the kafka applications +so the Prometheus server can scrape them. +The Prometheus server can't dynamically figure out pod IPs and scrape the +services directly, so we're using a metrics cache instead. + +The metrics cache is a StatefulSet with 2 services assigned to it. +One service is a normal service, with a unique cluster IP address. +The prometheus server scrapes this service endpoint. +The other service doesn't have a cluster IP, +which means that the monitoring utility can find the IP addresses of each +of the backing pods, and send metrics to all of the pods. +This setup gives us high-availability guarantees. + +The Monitoring utils are organized into two classes, `MetricsPusher` and `MetricsManager`. + +The `MetricsManager` is a container for all of the metrics for the app, +and contains convenience methods for the 3 standardized metrics. +These metrics are +- `messages_consumed`: The number of messages consumed by the app +- `messages_produced`: The number of messages produced by the app +- `message_errors`: The number of exceptions caught in the app (labeled by type) + +The `MetricsPusher` handles pushing the applications metrics to the metrics cache. +It determines the list of IP addresses for all of the metrics cache pods, +and sends the current metrics values for all of the metrics. + +### Metric names and labels +The names of the metrics in Prometheus are the same as their names as parameters +- `messages_consumed` +- `messages_produced` +- `message_errors` + +Two labels exist for every metric: +- `app`: The name of the microservice the metric came from +- `job`: The name of the individual pod the metric came from +The `message_errors` metric also has another label: +- `exception`: The name of the exception that triggered the metric + +### Monitoring Setup Examples +The initialization and update loop for application monitoring will differ +from application to application based on their architecture. +The following examples should cover the standard designs we use. + +#### Default Kafka Client Application +A Kafka application that directly relies on interacting with Producer or +Consumer clients should have it's monitoring classes set up and its +pushing thread started in the main run function and passed to the loop, as follows: +```python +import os + +from confluent_kafka import Consumer, Producer +from nubium_utils.metrics import MetricsManager, MetricsPusher, start_pushing_metrics + +def run_function(): + + consumer = Consumer() + producer = Producer() + + metrics_pusher = MetricsPusher( + os.environ['HOSTNAME'], + os.environ['METRICS_SERVICE_NAME'], + os.environ['METRICS_SERVICE_PORT'], + os.environ['METRICS_POD_PORT']) + metrics_manager = MetricsManager(job=os.environ['HOSTNAME'], app=os.environ['APP_NAME'], metrics_pusher=metrics_pusher) + start_pushing_metrics(metrics_manager, int(os.environ['METRICS_PUSH_RATE'])) + + try: + while True: + loop_function(consumer, producer, metrics_manager=metrics_manager) + finally: + consumer.close() + +``` + +The `consume_message()` function from this library expects a metrics_manager object +as an argument, so that it can increment the `messages_consumed` metric. + +The application itself needs to increment the `messages_produced` metric +needs to be incremented as necessary by the application itself +whenever a Kafka message is produced. The convenience method on the metrics_manager +`inc_messages_produced()` makes this easier, +since it automatically adds the necessary labels to the metric. + +The application also needs to be set to increment the `message_errors` metric +whenever an exception is caught. + +An example loop function might look like this: +```python +import os +import logging + +from nubium_utils import consume_message +from nubium_utils.custom_exceptions import NoMessageError + + +def loop_function(consumer, producer, metrics_manager): + try: + message = consume_message(consumer, int(os.environ['CONSUMER_POLL_TIMEOUT']), metrics_manager) + outgoing_key = message.value()['email_address'] + producer.produce(topic='outgoing_topic',key=outgoing_key,value=message.value()) + metrics_manager.inc_messages_produced(1) + except NoMessageError: + pass + except KeyError as error: + metrics_manager.inc_message_errors(error) + logging.debug('Message missing email address') + + +``` + +#### Flask Kafka Application +The setup becomes a little bit different with a Flask application. +The metrics_manager should be accessible through the app's configuration, +so that it can be accessed in route functions. + +The preferred method for error monitoring is to hook into the built in +flask error handling loop, using the `@app.errorhandler` decorator. +Here is an example `create_app()` function + +```python +import flask +from werkzeug.exceptions import HTTPException + +from .forms_producer_app import forms_producer +from .util_blueprint import app_util_bp + +def create_app(config): + """ + Creates app from config and needed blueprints + :param config: (Config) object used to configure the flask app + :return: (flask.App) the application object + """ + app = flask.Flask(__name__) + app.config.from_object(config) + + app.register_blueprint(forms_producer) + app.register_blueprint(app_util_bp) + + @app.errorhandler(HTTPException) + def handle_exception(e): + """ + Increment error gauge on metrics_manager before returning error message + """ + response = e.get_response() + response.data = f'{e.code}:{e.name} - {e.description}' + app.config['MONITOR'].inc_message_errors(e) + return response + + @app.errorhandler(Exception) + def unhandled_exception(error): + app.logger.error(f'Unhandled exception: {error}') + app.config['MONITOR'].inc_message_errors(error) + return f'Unhandled exception: {error}', 500 + + return app +``` + +The route functions for produced messages should increase the `messages_produced` +metric when necessary. +Example: +```python + +@forms_producer.route('/', methods=["POST"]) +@AUTH.login_required +def handle_form(): + """ + Ingests a dynamic form from Eloqua and produces it to the topic + """ + values = request.json + string_values = {key: str(value) for key, value in values.items()} + LOGGER.debug(f'Processing form: {values}') + + current_app.config['PRODUCER'].produce( + topic=current_app.config['TOPIC'], + key=values['C_EmailAddress'], + value={'form_data': string_values}, + on_delivery=produce_message_callback + ) + current_app.config['MONITOR'].inc_messages_produced(1) + + return jsonify(success=True) +``` + + +%package help +Summary: Development documents and examples for nubium-utils +Provides: python3-nubium-utils-doc +%description help +# Nubium Utils + +## Environment Variables + +Nubium-Utils now has a prefix for every environment variable it uses. Every environment +variable related to Nubium-Utils starts with `NU_`. + +Similarly, anything DUDE-related will start with `DUDE_`. + +### Important Environment Variables + +Nubium-Utils relies on having these environment variables defined: +- `NU_SCHEMA_REGISTRY_URL`: schema registry url +- `NU_SCHEMA_REGISTRY_USERNAME`: schema registry username (if applicable) +- `NU_SCHEMA_REGISTRY_PASSWORD`: schema registry password (if applicable) +- `NU_KAFKA_CLUSTERS_CONFIGS_JSON`: json of clusters and their respective connection settings. EX: +`'{"cluster_0": {"url": "url", "username": "un", "password": "pw"}, "cluster_1": {"url": "url", "username": "un", "password": "pw"}}'` +- `NU_TOPIC_CONFIGS_JSON`: json of topics and their respective cluster name (will reference NU_KAFKA_CLUSTERS) + (optional) create settings. EX: +`'{"topic_a": {"cluster": "cluster_0", "configs": {"num_partitions": 2, "replication_factor": 2, "config": {"segment.ms": 120000}}}, "topic_b": {"cluster": cluster_1}}'` +- `NU_HOSTNAME`: used for identifying unique consumer instances +- `NU_APP_NAME`: used to define your consumer group +- `NU_MP_PROJECT`: used for openshift deployment + + +## Confluent-Kafka GTFO App Classes + +### Overview +Nubium Utils now has an app framework for managing exactly-once processing confluent-kafka applications, named `GTFO`. +The idea is to simplify confluent-kafka down to a kafka-streams-like interface, where consumer management is largely +handled for you under the hood. This is particularly nice for having to manage exactly-once processing, +which the class implements and uses by default. + +There are some other subclasses for some more specific use cases, namely `GtfoBatchApp` and `GtfoTableApp`, but +the base class `GtfoApp` has been written with the intention of being easily extensible. Further details in +terms of the recommended approaches of this will be described below. + + +### The `Transaction` and `Gtfo` classes + +There are two essential classes you should understand before you dive in. + +The `Transaction` classes are actually the heart of the `GTFO` framework; they are what your business +logic methods will actually be interacting with on a message-to-message basis. That being said, for most +use cases, you wont even need to do much with them other than `Transaction.messages()` to get the currently consumed +message, and `Transaction.produce()` to send a new message out. + +The `Gtfo`-based classes generally wrap/interface with the `Transaction` objects, generating a new one for +every new transaction (which generally consists of consuming a message, produce desired messages, commit +consumed message.) In general, you will really only use the `Gtfo` class to initialize everything, and it doesn't +take much. + +Finally, as a general rule of thumb for both, there are not many methods to interact with on either class... +on purpose! The functionality outlined in here will likely cover >90% of use cases. + + +### Initializing/running a `GtfoApp`: basic example + +NOTE: there is a lot that is managed for you via environment variables, so definitely take a look at the +"**Important Environment Variables**" section to see what you should have defined before trying to run a `GTFO` app. + +There are two basic components you need to initialize an app at all: + +- `app_function`: the business logic of your application, which should use exactly 1 argument: the `Transaction` objects +that will be handed to it for every message. + +- `consume_topics_list`: the topics you are consuming. Can be comma-separated string or python list. + +That's it! That being said, this is if your app only consumes. To produce, you will additionally need, at minimum: + +- `produce_topic_schema_dict`: a dict that maps {'topic_name': _schema_obj_}, where the _schema_obj_ is a valid avro +schema dict. + +Then, to run the app, do: + +`GtfoApp.run()` + +Altogether, that might look something like this: + +```python +from nubium_utils.confluent_utils.transaction_utils import GtfoApp + +useless_schema = { + "name": "CoolSchema", + "type": "record", + "fields": [ + { + "name": "cool_field", + "type": "string" + } + ] +} + + +def useless_example_func(transaction, thing_inited_at_runtime): + msg = transaction.messages() # can also do transaction.value(); uses the confluent-kafka method with single messages + cool_message_out = msg.value() + cool_message_out['cool_field'] = thing_inited_at_runtime # 'cool value' + transaction.produce({'topic':'cool_topic_out', 'key': msg.key(), 'value': cool_message_out, 'headers': msg.headers()}) + +init_at_runtime_thing = 'cool value' # can hand off objects you need to init at runtime, like an api session object + +gtfo_app = GtfoApp( + app_function=useless_example_func, + consume_topics_list=['test_topic_1', 'test_topic_2'], + produce_topic_schema_dict={'cool_topic_out': useless_schema}, + app_function_arglist = [init_at_runtime_thing]) # optional! Here to show functionality. +gtfo_app.run() +``` + +### Using `GtfoBatchApp` (plus `BatchTransaction`) + +Sometimes, you want to handle multiple messages at once, such as doing a bulk upload of data to an API. +In this case, treating each message as a separate transaction doesn't make much sense! For this, we have `GtfoBatchApp`! + +We still rely on much of what `Gtfo` and `Transaction` lays out, but now we can specify how many messages should +be consumed by default for a given transaction. Additionally, you can consume more messages on demand with the +`BulkTransaction` object via `BulkTransaction.consume()` in your `app_function`, in case you'd like to do the normal +consume pattern most of the time, but might need that to change on demand. + +Biggest thing to note here: all messages for that transaction will (just like `Transaction`) be accessible via +`BatchTransaction.messages()`. Also, all of them get committed when you finish with that instance of `app_function` just +like a typical singleton run of `app_function`, so keep that in mind! + +You can tweak the default batch size via `NU_CONSUMER_DEFAULT_BATCH_CONSUME_MAX_COUNT`; defaulted to 1. + +Here is an example: + +```python +from nubium_utils.confluent_utils.transaction_utils import GtfoBatchApp +from cool_api import bulk_upload + +useless_schema = { + "name": "CoolSchema", + "type": "record", + "fields": [ + { + "name": "cool_field", + "type": "string" + } + ] +} + +def prep_data(msg_val): + # do stuff + pass + +def useless_example_func(bulk_transaction): + msg = transaction.messages() + last_msg = msg[-1] + # I originally set my max to 10 via the env var...but with some logic, if a message says I should do a bigger batch... + if last_msg.headers()['needs_bigger_bulk'] == 'true': + bulk_transaction.consume(consume_max_count=500) # allow me to raise that max to 500, and consume up to that (will consume up to 490 more)! + bulk_upload([prep_data(msg.value()) for msg in bulk_transaction.messages()]) # push all 500 messages + +gtfo_app = GtfoBatchApp( + app_function=useless_example_func, + consume_topic='test_topic_in', + produce_topic_schema_dict={'cool_topic_out': useless_schema}) +gtfo_app.run() +``` + + +### Using `GtfoTableApp` (plus `TableTransaction`) + +One of the more complex components of GTFO is the `GtfoTableApp`. It allows you to store state based on a +kafka topic via a localized datastore. Basically, you can store whatever you want with respect to a given kafka key, +and later reference/compare that data against a new version of that key. + +There are some functional caveats/limitations that come along with this feature set: +- The app can only consume from one topic (you can make an upstream "funnel" app as needed). +- You must make a topic named the same as `{NU_APP_NAME}__changelog`, with the same partition count as the topic it +would consume from. +- You can only store/reference things based on the same key as the current message. +- Each instance of your application needs to use the same volume. +- Data needs to be stored as a json/dict (it is stored as a string and `json.load`-ed at runtime) + +With that in mind, set up is almost exactly the same as `GtfoApp`; here is an example: + +```python +from nubium_utils.confluent_utils.transaction_utils import GtfoTableApp + +useless_schema = { + "name": "CoolSchema", + "type": "record", + "fields": [ + { + "name": "cool_field", + "type": "string" + } + ] +} + + +def useless_example_func(transaction): + msg = transaction.messages() # can also do transaction.value(); uses the confluent-kafka method with single messages + cool_message_out = msg.value() + previous_message = transaction.read_table_entry() + # lets only do stuff if our previous version for the same key was "just cool"! + if previous_message['cool_field'] == 'just cool': + cool_message_out['cool_field'] = 'very cool now' + transaction.update_table_entry(cool_message_out) # we want to update the table with our new value. It does not do this automatically! + transaction.produce({'topic':'cool_topic_out', 'key': msg.key(), 'value': cool_message_out, 'headers': msg.headers()}) + + +gtfo_app = GtfoTableApp( + app_function=useless_example_func, + consume_topic='test_topic_in', + produce_topic_schema_dict={'cool_topic_out': useless_schema}) +gtfo_app.run() +``` + +### Extending `GtfoApp` and `Transaction` + +Of course, you can never cover _every_ use case! As such, each class was designed +with extensibility in mind! + +Most often, what needs more customization is your consumption pattern (hence why there was already a bulk class!), +and there is a relatively painless way to address that with minimal alterations. + +There is an `init` argument on `GtfoApp` named `transaction_type`; this allows you to easily pass an augmented +version of `Transaction` with your changed consumption pattern, potentially without changing the `Transaction` class +at all! + +Otherwise, hopefully things have been compartimentalized enough that you can just replace methods as needed, but +in general, usually you'll need to mess a little with both classes, but likely mostly `Transaction`. + +## Monitoring +The monitoring utils enable metrics to be surfaced from the kafka applications +so the Prometheus server can scrape them. +The Prometheus server can't dynamically figure out pod IPs and scrape the +services directly, so we're using a metrics cache instead. + +The metrics cache is a StatefulSet with 2 services assigned to it. +One service is a normal service, with a unique cluster IP address. +The prometheus server scrapes this service endpoint. +The other service doesn't have a cluster IP, +which means that the monitoring utility can find the IP addresses of each +of the backing pods, and send metrics to all of the pods. +This setup gives us high-availability guarantees. + +The Monitoring utils are organized into two classes, `MetricsPusher` and `MetricsManager`. + +The `MetricsManager` is a container for all of the metrics for the app, +and contains convenience methods for the 3 standardized metrics. +These metrics are +- `messages_consumed`: The number of messages consumed by the app +- `messages_produced`: The number of messages produced by the app +- `message_errors`: The number of exceptions caught in the app (labeled by type) + +The `MetricsPusher` handles pushing the applications metrics to the metrics cache. +It determines the list of IP addresses for all of the metrics cache pods, +and sends the current metrics values for all of the metrics. + +### Metric names and labels +The names of the metrics in Prometheus are the same as their names as parameters +- `messages_consumed` +- `messages_produced` +- `message_errors` + +Two labels exist for every metric: +- `app`: The name of the microservice the metric came from +- `job`: The name of the individual pod the metric came from +The `message_errors` metric also has another label: +- `exception`: The name of the exception that triggered the metric + +### Monitoring Setup Examples +The initialization and update loop for application monitoring will differ +from application to application based on their architecture. +The following examples should cover the standard designs we use. + +#### Default Kafka Client Application +A Kafka application that directly relies on interacting with Producer or +Consumer clients should have it's monitoring classes set up and its +pushing thread started in the main run function and passed to the loop, as follows: +```python +import os + +from confluent_kafka import Consumer, Producer +from nubium_utils.metrics import MetricsManager, MetricsPusher, start_pushing_metrics + +def run_function(): + + consumer = Consumer() + producer = Producer() + + metrics_pusher = MetricsPusher( + os.environ['HOSTNAME'], + os.environ['METRICS_SERVICE_NAME'], + os.environ['METRICS_SERVICE_PORT'], + os.environ['METRICS_POD_PORT']) + metrics_manager = MetricsManager(job=os.environ['HOSTNAME'], app=os.environ['APP_NAME'], metrics_pusher=metrics_pusher) + start_pushing_metrics(metrics_manager, int(os.environ['METRICS_PUSH_RATE'])) + + try: + while True: + loop_function(consumer, producer, metrics_manager=metrics_manager) + finally: + consumer.close() + +``` + +The `consume_message()` function from this library expects a metrics_manager object +as an argument, so that it can increment the `messages_consumed` metric. + +The application itself needs to increment the `messages_produced` metric +needs to be incremented as necessary by the application itself +whenever a Kafka message is produced. The convenience method on the metrics_manager +`inc_messages_produced()` makes this easier, +since it automatically adds the necessary labels to the metric. + +The application also needs to be set to increment the `message_errors` metric +whenever an exception is caught. + +An example loop function might look like this: +```python +import os +import logging + +from nubium_utils import consume_message +from nubium_utils.custom_exceptions import NoMessageError + + +def loop_function(consumer, producer, metrics_manager): + try: + message = consume_message(consumer, int(os.environ['CONSUMER_POLL_TIMEOUT']), metrics_manager) + outgoing_key = message.value()['email_address'] + producer.produce(topic='outgoing_topic',key=outgoing_key,value=message.value()) + metrics_manager.inc_messages_produced(1) + except NoMessageError: + pass + except KeyError as error: + metrics_manager.inc_message_errors(error) + logging.debug('Message missing email address') + + +``` + +#### Flask Kafka Application +The setup becomes a little bit different with a Flask application. +The metrics_manager should be accessible through the app's configuration, +so that it can be accessed in route functions. + +The preferred method for error monitoring is to hook into the built in +flask error handling loop, using the `@app.errorhandler` decorator. +Here is an example `create_app()` function + +```python +import flask +from werkzeug.exceptions import HTTPException + +from .forms_producer_app import forms_producer +from .util_blueprint import app_util_bp + +def create_app(config): + """ + Creates app from config and needed blueprints + :param config: (Config) object used to configure the flask app + :return: (flask.App) the application object + """ + app = flask.Flask(__name__) + app.config.from_object(config) + + app.register_blueprint(forms_producer) + app.register_blueprint(app_util_bp) + + @app.errorhandler(HTTPException) + def handle_exception(e): + """ + Increment error gauge on metrics_manager before returning error message + """ + response = e.get_response() + response.data = f'{e.code}:{e.name} - {e.description}' + app.config['MONITOR'].inc_message_errors(e) + return response + + @app.errorhandler(Exception) + def unhandled_exception(error): + app.logger.error(f'Unhandled exception: {error}') + app.config['MONITOR'].inc_message_errors(error) + return f'Unhandled exception: {error}', 500 + + return app +``` + +The route functions for produced messages should increase the `messages_produced` +metric when necessary. +Example: +```python + +@forms_producer.route('/', methods=["POST"]) +@AUTH.login_required +def handle_form(): + """ + Ingests a dynamic form from Eloqua and produces it to the topic + """ + values = request.json + string_values = {key: str(value) for key, value in values.items()} + LOGGER.debug(f'Processing form: {values}') + + current_app.config['PRODUCER'].produce( + topic=current_app.config['TOPIC'], + key=values['C_EmailAddress'], + value={'form_data': string_values}, + on_delivery=produce_message_callback + ) + current_app.config['MONITOR'].inc_messages_produced(1) + + return jsonify(success=True) +``` + + +%prep +%autosetup -n nubium-utils-4.2.0 + +%build +%py3_build + +%install +%py3_install +install -d -m755 %{buildroot}/%{_pkgdocdir} +if [ -d doc ]; then cp -arf doc %{buildroot}/%{_pkgdocdir}; fi +if [ -d docs ]; then cp -arf docs %{buildroot}/%{_pkgdocdir}; fi +if [ -d example ]; then cp -arf example %{buildroot}/%{_pkgdocdir}; fi +if [ -d examples ]; then cp -arf examples %{buildroot}/%{_pkgdocdir}; fi +pushd %{buildroot} +if [ -d usr/lib ]; then + find usr/lib -type f -printf "/%h/%f\n" >> filelist.lst +fi +if [ -d usr/lib64 ]; then + find usr/lib64 -type f -printf "/%h/%f\n" >> filelist.lst +fi +if [ -d usr/bin ]; then + find usr/bin -type f -printf "/%h/%f\n" >> filelist.lst +fi +if [ -d usr/sbin ]; then + find usr/sbin -type f -printf "/%h/%f\n" >> filelist.lst +fi +touch doclist.lst +if [ -d usr/share/man ]; then + find usr/share/man -type f -printf "/%h/%f.gz\n" >> doclist.lst +fi +popd +mv %{buildroot}/filelist.lst . +mv %{buildroot}/doclist.lst . + +%files -n python3-nubium-utils -f filelist.lst +%dir %{python3_sitelib}/* + +%files help -f doclist.lst +%{_docdir}/* + +%changelog +* Fri May 05 2023 Python_Bot <Python_Bot@openeuler.org> - 4.2.0-1 +- Package Spec generated |
