%global _empty_manifest_terminate_build 0 Name: python-afkak Version: 21.5.0.post0 Release: 1 Summary: Twisted Python client for Apache Kafka License: Apache License 2.0 URL: https://github.com/ciena/afkak Source0: https://mirrors.nju.edu.cn/pypi/web/packages/31/cb/a863f8c163440d0e8ae2283ab0284a1810f5cc661283c2c507036d185e17/afkak-21.5.0.post0.tar.gz BuildArch: noarch Requires: python3-attrs Requires: python3-Twisted Requires: python3-pyhash Requires: python3-snappy %description Afkak is a [Twisted](https://twistedmatrix.com/)-native [Apache Kafka](https://kafka.apache.org/) client library. It provides support for: * Producing messages, with automatic batching and optional compression. * Consuming messages, with group coordination and automatic commit. Learn more in the **[documentation](https://afkak.readthedocs.io/en/latest/)**, download [from PyPI](https://pypi.org/projects/afkak), or review the [contribution guidelines](./CONTRIBUTING.md). Please report any issues [on GitHub](https://github.com/ciena/afkak/issues). # Status Afkak supports these Pythons: - CPython 3.5, 3.6, 3.7, 3.8, and 3.9 - PyPy3 We aim to support Kafka 1.1.x and later. Integration tests are run against these Kafka broker versions: - 0.9.0.1 - 1.1.1 Testing against 2.0.0 is planned (see [#45](https://github.com/ciena/afkak/issues/45)). Newer broker releases will generally function, but not all Afkak features will work on older brokers. In particular, the coordinated consumer won’t work before Kafka 0.9.0.1. We don’t recommend deploying such old releases of Kafka anyway, as they have serious bugs. # Usage ### High level Note: This code is not meant to be runnable. See [producer\_example](./producer_example) and [consumer\_example](./consumer_example) for runnable example code. ```python from afkak.client import KafkaClient from afkak.consumer import Consumer from afkak.producer import Producer from afkak.common import (OFFSET_EARLIEST, PRODUCER_ACK_ALL_REPLICAS, PRODUCER_ACK_LOCAL_WRITE) kClient = KafkaClient("localhost:9092") # To send messages producer = Producer(kClient) d1 = producer.send_messages("my-topic", msgs=[b"some message"]) d2 = producer.send_messages("my-topic", msgs=[b"takes a list", b"of messages"]) # To get confirmations/errors on the sends, add callbacks to the returned deferreds d1.addCallbacks(handleResponses, handleErrors) # To wait for acknowledgements # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written to # a local log before sending response # [ the default ] # PRODUCER_ACK_ALL_REPLICAS : server will block until the message is committed # by all in sync replicas before sending a response producer = Producer(kClient, req_acks=Producer.PRODUCER_ACK_LOCAL_WRITE, ack_timeout=2000) responseD = producer.send_messages("my-topic", msgs=[b"message"]) # Using twisted's @inlineCallbacks: responses = yield responseD if response: print(response[0].error) print(response[0].offset) # To send messages in batch: You can use a producer with any of the # partitioners for doing this. The following producer will collect # messages in batch and send them to Kafka after 20 messages are # collected or every 60 seconds (whichever comes first). You can # also batch by number of bytes. # Notes: # * If the producer dies before the messages are sent, the caller would # * not have had the callbacks called on the send_messages() returned # * deferreds, and so can retry. # * Calling producer.stop() before the messages are sent will # errback() the deferred(s) returned from the send_messages call(s) producer = Producer(kClient, batch_send=True, batch_send_every_n=20, batch_send_every_t=60) responseD1 = producer.send_messages("my-topic", msgs=[b"message"]) responseD2 = producer.send_messages("my-topic", msgs=[b"message 2"]) # To consume messages # define a function which takes a list of messages to process and # possibly returns a deferred which fires when the processing is # complete. def processor_func(consumer, messages): # Store_Messages_In_Database may return a deferred result = store_messages_in_database(messages) # record last processed message consumer.commit() return result the_partition = 3 # Consume only from partition 3. consumer = Consumer(kClient, "my-topic", the_partition, processor_func) d = consumer.start(OFFSET_EARLIEST) # Start reading at earliest message # The deferred returned by consumer.start() will fire when an error # occurs that can't handled by the consumer, or when consumer.stop() # is called yield d consumer.stop() kClient.close() ``` #### Keyed messages ```python from afkak.client import KafkaClient from afkak.producer import Producer from afkak.partitioner import HashedPartitioner, RoundRobinPartitioner kafka = KafkaClient("localhost:9092") # Use the HashedPartitioner so that the producer will use the optional key # argument on send_messages() producer = Producer(kafka, partitioner_class=HashedPartitioner) producer.send_messages("my-topic", "key1", [b"some message"]) producer.send_messages("my-topic", "key2", [b"this method"]) ``` ### Low level ```python from afkak.client import KafkaClient kafka = KafkaClient("localhost:9092") req = ProduceRequest(topic="my-topic", partition=1, messages=[KafkaProtocol.encode_message(b"some message")]) resps = afkak.send_produce_request(payloads=[req], fail_on_error=True) kafka.close() resps[0].topic # b"my-topic" resps[0].partition # 1 resps[0].error # 0 (hopefully) resps[0].offset # offset of the first message sent in this request ``` # Install Afkak releases are [available on PyPI][afkak-pypi]. Because the Afkak dependencies [Twisted][twisted] and [python-snappy][python-snappy] have binary extension modules you will need to install the Python development headers for the interpreter you wish to use: [afkak-pypi]: https://pypi.python.org/pypi/afkak [twisted]: https://pypi.python.org/pypi/Twisted [python-snappy]: https://pypi.python.org/pypi/python-snappy
Debian/Ubuntu: sudo apt-get install build-essential python3-dev pypy3-dev libsnappy-dev
OS X brew install python pypy snappy
pip install virtualenv
Then Afkak can be [installed with pip as usual][pip-install]: [pip-install]: https://packaging.python.org/en/latest/installing/ # License Copyright 2013, 2014, 2015 David Arthur under Apache License, v2.0. See `LICENSE` Copyright 2014, 2015 Cyan, Inc. under Apache License, v2.0. See `LICENSE` Copyright 2015–2021 Ciena Corporation under Apache License, v2.0. See `LICENSE` This project began as a port of the [kafka-python][kafka-python] library to Twisted. [kafka-python]: https://github.com/mumrah/kafka-python See [AUTHORS.md](./AUTHORS.md) for the full contributor list. %package -n python3-afkak Summary: Twisted Python client for Apache Kafka Provides: python-afkak BuildRequires: python3-devel BuildRequires: python3-setuptools BuildRequires: python3-pip %description -n python3-afkak Afkak is a [Twisted](https://twistedmatrix.com/)-native [Apache Kafka](https://kafka.apache.org/) client library. It provides support for: * Producing messages, with automatic batching and optional compression. * Consuming messages, with group coordination and automatic commit. Learn more in the **[documentation](https://afkak.readthedocs.io/en/latest/)**, download [from PyPI](https://pypi.org/projects/afkak), or review the [contribution guidelines](./CONTRIBUTING.md). Please report any issues [on GitHub](https://github.com/ciena/afkak/issues). # Status Afkak supports these Pythons: - CPython 3.5, 3.6, 3.7, 3.8, and 3.9 - PyPy3 We aim to support Kafka 1.1.x and later. Integration tests are run against these Kafka broker versions: - 0.9.0.1 - 1.1.1 Testing against 2.0.0 is planned (see [#45](https://github.com/ciena/afkak/issues/45)). Newer broker releases will generally function, but not all Afkak features will work on older brokers. In particular, the coordinated consumer won’t work before Kafka 0.9.0.1. We don’t recommend deploying such old releases of Kafka anyway, as they have serious bugs. # Usage ### High level Note: This code is not meant to be runnable. See [producer\_example](./producer_example) and [consumer\_example](./consumer_example) for runnable example code. ```python from afkak.client import KafkaClient from afkak.consumer import Consumer from afkak.producer import Producer from afkak.common import (OFFSET_EARLIEST, PRODUCER_ACK_ALL_REPLICAS, PRODUCER_ACK_LOCAL_WRITE) kClient = KafkaClient("localhost:9092") # To send messages producer = Producer(kClient) d1 = producer.send_messages("my-topic", msgs=[b"some message"]) d2 = producer.send_messages("my-topic", msgs=[b"takes a list", b"of messages"]) # To get confirmations/errors on the sends, add callbacks to the returned deferreds d1.addCallbacks(handleResponses, handleErrors) # To wait for acknowledgements # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written to # a local log before sending response # [ the default ] # PRODUCER_ACK_ALL_REPLICAS : server will block until the message is committed # by all in sync replicas before sending a response producer = Producer(kClient, req_acks=Producer.PRODUCER_ACK_LOCAL_WRITE, ack_timeout=2000) responseD = producer.send_messages("my-topic", msgs=[b"message"]) # Using twisted's @inlineCallbacks: responses = yield responseD if response: print(response[0].error) print(response[0].offset) # To send messages in batch: You can use a producer with any of the # partitioners for doing this. The following producer will collect # messages in batch and send them to Kafka after 20 messages are # collected or every 60 seconds (whichever comes first). You can # also batch by number of bytes. # Notes: # * If the producer dies before the messages are sent, the caller would # * not have had the callbacks called on the send_messages() returned # * deferreds, and so can retry. # * Calling producer.stop() before the messages are sent will # errback() the deferred(s) returned from the send_messages call(s) producer = Producer(kClient, batch_send=True, batch_send_every_n=20, batch_send_every_t=60) responseD1 = producer.send_messages("my-topic", msgs=[b"message"]) responseD2 = producer.send_messages("my-topic", msgs=[b"message 2"]) # To consume messages # define a function which takes a list of messages to process and # possibly returns a deferred which fires when the processing is # complete. def processor_func(consumer, messages): # Store_Messages_In_Database may return a deferred result = store_messages_in_database(messages) # record last processed message consumer.commit() return result the_partition = 3 # Consume only from partition 3. consumer = Consumer(kClient, "my-topic", the_partition, processor_func) d = consumer.start(OFFSET_EARLIEST) # Start reading at earliest message # The deferred returned by consumer.start() will fire when an error # occurs that can't handled by the consumer, or when consumer.stop() # is called yield d consumer.stop() kClient.close() ``` #### Keyed messages ```python from afkak.client import KafkaClient from afkak.producer import Producer from afkak.partitioner import HashedPartitioner, RoundRobinPartitioner kafka = KafkaClient("localhost:9092") # Use the HashedPartitioner so that the producer will use the optional key # argument on send_messages() producer = Producer(kafka, partitioner_class=HashedPartitioner) producer.send_messages("my-topic", "key1", [b"some message"]) producer.send_messages("my-topic", "key2", [b"this method"]) ``` ### Low level ```python from afkak.client import KafkaClient kafka = KafkaClient("localhost:9092") req = ProduceRequest(topic="my-topic", partition=1, messages=[KafkaProtocol.encode_message(b"some message")]) resps = afkak.send_produce_request(payloads=[req], fail_on_error=True) kafka.close() resps[0].topic # b"my-topic" resps[0].partition # 1 resps[0].error # 0 (hopefully) resps[0].offset # offset of the first message sent in this request ``` # Install Afkak releases are [available on PyPI][afkak-pypi]. Because the Afkak dependencies [Twisted][twisted] and [python-snappy][python-snappy] have binary extension modules you will need to install the Python development headers for the interpreter you wish to use: [afkak-pypi]: https://pypi.python.org/pypi/afkak [twisted]: https://pypi.python.org/pypi/Twisted [python-snappy]: https://pypi.python.org/pypi/python-snappy
Debian/Ubuntu: sudo apt-get install build-essential python3-dev pypy3-dev libsnappy-dev
OS X brew install python pypy snappy
pip install virtualenv
Then Afkak can be [installed with pip as usual][pip-install]: [pip-install]: https://packaging.python.org/en/latest/installing/ # License Copyright 2013, 2014, 2015 David Arthur under Apache License, v2.0. See `LICENSE` Copyright 2014, 2015 Cyan, Inc. under Apache License, v2.0. See `LICENSE` Copyright 2015–2021 Ciena Corporation under Apache License, v2.0. See `LICENSE` This project began as a port of the [kafka-python][kafka-python] library to Twisted. [kafka-python]: https://github.com/mumrah/kafka-python See [AUTHORS.md](./AUTHORS.md) for the full contributor list. %package help Summary: Development documents and examples for afkak Provides: python3-afkak-doc %description help Afkak is a [Twisted](https://twistedmatrix.com/)-native [Apache Kafka](https://kafka.apache.org/) client library. It provides support for: * Producing messages, with automatic batching and optional compression. * Consuming messages, with group coordination and automatic commit. Learn more in the **[documentation](https://afkak.readthedocs.io/en/latest/)**, download [from PyPI](https://pypi.org/projects/afkak), or review the [contribution guidelines](./CONTRIBUTING.md). Please report any issues [on GitHub](https://github.com/ciena/afkak/issues). # Status Afkak supports these Pythons: - CPython 3.5, 3.6, 3.7, 3.8, and 3.9 - PyPy3 We aim to support Kafka 1.1.x and later. Integration tests are run against these Kafka broker versions: - 0.9.0.1 - 1.1.1 Testing against 2.0.0 is planned (see [#45](https://github.com/ciena/afkak/issues/45)). Newer broker releases will generally function, but not all Afkak features will work on older brokers. In particular, the coordinated consumer won’t work before Kafka 0.9.0.1. We don’t recommend deploying such old releases of Kafka anyway, as they have serious bugs. # Usage ### High level Note: This code is not meant to be runnable. See [producer\_example](./producer_example) and [consumer\_example](./consumer_example) for runnable example code. ```python from afkak.client import KafkaClient from afkak.consumer import Consumer from afkak.producer import Producer from afkak.common import (OFFSET_EARLIEST, PRODUCER_ACK_ALL_REPLICAS, PRODUCER_ACK_LOCAL_WRITE) kClient = KafkaClient("localhost:9092") # To send messages producer = Producer(kClient) d1 = producer.send_messages("my-topic", msgs=[b"some message"]) d2 = producer.send_messages("my-topic", msgs=[b"takes a list", b"of messages"]) # To get confirmations/errors on the sends, add callbacks to the returned deferreds d1.addCallbacks(handleResponses, handleErrors) # To wait for acknowledgements # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written to # a local log before sending response # [ the default ] # PRODUCER_ACK_ALL_REPLICAS : server will block until the message is committed # by all in sync replicas before sending a response producer = Producer(kClient, req_acks=Producer.PRODUCER_ACK_LOCAL_WRITE, ack_timeout=2000) responseD = producer.send_messages("my-topic", msgs=[b"message"]) # Using twisted's @inlineCallbacks: responses = yield responseD if response: print(response[0].error) print(response[0].offset) # To send messages in batch: You can use a producer with any of the # partitioners for doing this. The following producer will collect # messages in batch and send them to Kafka after 20 messages are # collected or every 60 seconds (whichever comes first). You can # also batch by number of bytes. # Notes: # * If the producer dies before the messages are sent, the caller would # * not have had the callbacks called on the send_messages() returned # * deferreds, and so can retry. # * Calling producer.stop() before the messages are sent will # errback() the deferred(s) returned from the send_messages call(s) producer = Producer(kClient, batch_send=True, batch_send_every_n=20, batch_send_every_t=60) responseD1 = producer.send_messages("my-topic", msgs=[b"message"]) responseD2 = producer.send_messages("my-topic", msgs=[b"message 2"]) # To consume messages # define a function which takes a list of messages to process and # possibly returns a deferred which fires when the processing is # complete. def processor_func(consumer, messages): # Store_Messages_In_Database may return a deferred result = store_messages_in_database(messages) # record last processed message consumer.commit() return result the_partition = 3 # Consume only from partition 3. consumer = Consumer(kClient, "my-topic", the_partition, processor_func) d = consumer.start(OFFSET_EARLIEST) # Start reading at earliest message # The deferred returned by consumer.start() will fire when an error # occurs that can't handled by the consumer, or when consumer.stop() # is called yield d consumer.stop() kClient.close() ``` #### Keyed messages ```python from afkak.client import KafkaClient from afkak.producer import Producer from afkak.partitioner import HashedPartitioner, RoundRobinPartitioner kafka = KafkaClient("localhost:9092") # Use the HashedPartitioner so that the producer will use the optional key # argument on send_messages() producer = Producer(kafka, partitioner_class=HashedPartitioner) producer.send_messages("my-topic", "key1", [b"some message"]) producer.send_messages("my-topic", "key2", [b"this method"]) ``` ### Low level ```python from afkak.client import KafkaClient kafka = KafkaClient("localhost:9092") req = ProduceRequest(topic="my-topic", partition=1, messages=[KafkaProtocol.encode_message(b"some message")]) resps = afkak.send_produce_request(payloads=[req], fail_on_error=True) kafka.close() resps[0].topic # b"my-topic" resps[0].partition # 1 resps[0].error # 0 (hopefully) resps[0].offset # offset of the first message sent in this request ``` # Install Afkak releases are [available on PyPI][afkak-pypi]. Because the Afkak dependencies [Twisted][twisted] and [python-snappy][python-snappy] have binary extension modules you will need to install the Python development headers for the interpreter you wish to use: [afkak-pypi]: https://pypi.python.org/pypi/afkak [twisted]: https://pypi.python.org/pypi/Twisted [python-snappy]: https://pypi.python.org/pypi/python-snappy
Debian/Ubuntu: sudo apt-get install build-essential python3-dev pypy3-dev libsnappy-dev
OS X brew install python pypy snappy
pip install virtualenv
Then Afkak can be [installed with pip as usual][pip-install]: [pip-install]: https://packaging.python.org/en/latest/installing/ # License Copyright 2013, 2014, 2015 David Arthur under Apache License, v2.0. See `LICENSE` Copyright 2014, 2015 Cyan, Inc. under Apache License, v2.0. See `LICENSE` Copyright 2015–2021 Ciena Corporation under Apache License, v2.0. See `LICENSE` This project began as a port of the [kafka-python][kafka-python] library to Twisted. [kafka-python]: https://github.com/mumrah/kafka-python See [AUTHORS.md](./AUTHORS.md) for the full contributor list. %prep %autosetup -n afkak-21.5.0.post0 %build %py3_build %install %py3_install install -d -m755 %{buildroot}/%{_pkgdocdir} if [ -d doc ]; then cp -arf doc %{buildroot}/%{_pkgdocdir}; fi if [ -d docs ]; then cp -arf docs %{buildroot}/%{_pkgdocdir}; fi if [ -d example ]; then cp -arf example %{buildroot}/%{_pkgdocdir}; fi if [ -d examples ]; then cp -arf examples %{buildroot}/%{_pkgdocdir}; fi pushd %{buildroot} if [ -d usr/lib ]; then find usr/lib -type f -printf "/%h/%f\n" >> filelist.lst fi if [ -d usr/lib64 ]; then find usr/lib64 -type f -printf "/%h/%f\n" >> filelist.lst fi if [ -d usr/bin ]; then find usr/bin -type f -printf "/%h/%f\n" >> filelist.lst fi if [ -d usr/sbin ]; then find usr/sbin -type f -printf "/%h/%f\n" >> filelist.lst fi touch doclist.lst if [ -d usr/share/man ]; then find usr/share/man -type f -printf "/%h/%f.gz\n" >> doclist.lst fi popd mv %{buildroot}/filelist.lst . mv %{buildroot}/doclist.lst . %files -n python3-afkak -f filelist.lst %dir %{python3_sitelib}/* %files help -f doclist.lst %{_docdir}/* %changelog * Wed May 10 2023 Python_Bot - 21.5.0.post0-1 - Package Spec generated