%global _empty_manifest_terminate_build 0 Name: python-async-kinesis-client Version: 0.2.14 Release: 1 Summary: Asynchronous Python client for AWS Kinesis License: MIT URL: https://github.com/whale2/async-kinesis-client Source0: https://mirrors.aliyun.com/pypi/web/packages/b0/5e/e2d9b293c8cd76605beba210241b54badc8e33c1e46dd584a5681e0c0247/async-kinesis-client-0.2.14.tar.gz BuildArch: noarch Requires: python3-aioboto3 Requires: python3-multidict %description # async-kinesis-client Python Kinesis Client library utilising asyncio Based on Kinesis-Python project by Evan Borgstrom https://github.com/NerdWalletOSS/kinesis-python but with asyncio magic The problem with Kinesis-Python is that all the data ends up in a single thread and being checkpointed from there - so despite having many processes, the client is clogged by checkpointing. Besides, it checkpoints every single record and this is not configurable. This client is based on aioboto3 library and uses Python 3.6+ async methods. Usage: ```python import asyncio from async_kinesis_client.kinesis_consumer import AsyncKinesisConsumer async def read_stream(): # This is a coroutine that reads all the records from a shard async def read_records(shard_reader): async for records in shard_reader.get_records(): for r in records: print('Shard: {}; Record: {}'.format(shard_reader.shard_id, r)) consumer = AsyncKinesisConsumer( stream_name='my-stream', checkpoint_table='my-checkpoint-table') # consumer will yield existing shards and will continue yielding # new shards if re-sharding happens async for shard_reader in consumer.get_shard_readers(): print('Got shard reader for shard id: {}'.format(shard_reader.shard_id)) asyncio.ensure_future(read_records(shard_reader)) asyncio.get_event_loop().run_until_complete(read_stream()) ``` *AsyncShardReader* and *AsyncKinesisConsumer* can be stopped from parallel coroutine by calling *stop()* method, consumer will stop all shard readers in that case. If you want to be notified of shard closing, catch *ShardClosedException* while reading records *AsyncShardReader* exposes property millis_behind_latest which could be useful for determining application performance. *AsyncKinesisConsumer* has following configuration methods: *set_checkpoint_interval(records)* - how many records to skip before checkpointing *set_lock_duration(time)* - how many seconds to hold the lock. Consumer would attempt to refresh the lock before that time *set_reader_sleep_time(time)* - how long should shard reader wait (in seconds, fractions possible) if it did not receive any records from Kinesis stream *set_checkpoint_callback(coro)* - set callback coroutine to be called before checkpointing next batch of records. Coroutine arguments: *ShardId*, *SequenceNumber* Producer is rather trivial: ```python from async_kinesis_client.kinesis_producer import AsyncKinesisProducer # ... async def write_stream(): producer = AsyncKinesisProducer( stream_name='my-stream', ordered=True ) await producer.put_record( record=b'bytes', partition_key='string', # optional, if none, default time-based key is used explicit_hash_key='string' # optional ) ``` Sending multiple records at once: ```python from async_kinesis_client.kinesis_producer import AsyncKinesisProducer # ... async def write_stream(): producer = AsyncKinesisProducer( stream_name='my-stream', ordered=True ) records = [ { 'Data': b'bytes', 'PartitionKey': 'string', # optional, if none, default time-based key is used 'ExplicitHashKey': 'string' # optional }, ... ] response = await producer.put_records( records=records ) # See boto3 docs for response structure: # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records ``` AWS authentication. For testing outside AWS cloud, especially when Mutil-Factor Authentication is in use I find following snippet extremely useful: ```python import os import aioboto3 from botocore import credentials from aiobotocore import AioSession working_dir = os.path.join(os.path.expanduser('~'), '.aws/cli/cache') session = AioSession(profile=os.environ.get('AWS_PROFILE')) provider = session.get_component('credential_provider').get_provider('assume-role') provider.cache = credentials.JSONFileCache(working_dir) aioboto3.setup_default_session(botocore_session=session) ``` This allows re-using cached session token after completing any aws command under *awsudo*, all you need is to set AWS_PROFILE environment variable. Currently library still not tested enough for different network events. Use it on your own risk, you've been warned. %package -n python3-async-kinesis-client Summary: Asynchronous Python client for AWS Kinesis Provides: python-async-kinesis-client BuildRequires: python3-devel BuildRequires: python3-setuptools BuildRequires: python3-pip %description -n python3-async-kinesis-client # async-kinesis-client Python Kinesis Client library utilising asyncio Based on Kinesis-Python project by Evan Borgstrom https://github.com/NerdWalletOSS/kinesis-python but with asyncio magic The problem with Kinesis-Python is that all the data ends up in a single thread and being checkpointed from there - so despite having many processes, the client is clogged by checkpointing. Besides, it checkpoints every single record and this is not configurable. This client is based on aioboto3 library and uses Python 3.6+ async methods. Usage: ```python import asyncio from async_kinesis_client.kinesis_consumer import AsyncKinesisConsumer async def read_stream(): # This is a coroutine that reads all the records from a shard async def read_records(shard_reader): async for records in shard_reader.get_records(): for r in records: print('Shard: {}; Record: {}'.format(shard_reader.shard_id, r)) consumer = AsyncKinesisConsumer( stream_name='my-stream', checkpoint_table='my-checkpoint-table') # consumer will yield existing shards and will continue yielding # new shards if re-sharding happens async for shard_reader in consumer.get_shard_readers(): print('Got shard reader for shard id: {}'.format(shard_reader.shard_id)) asyncio.ensure_future(read_records(shard_reader)) asyncio.get_event_loop().run_until_complete(read_stream()) ``` *AsyncShardReader* and *AsyncKinesisConsumer* can be stopped from parallel coroutine by calling *stop()* method, consumer will stop all shard readers in that case. If you want to be notified of shard closing, catch *ShardClosedException* while reading records *AsyncShardReader* exposes property millis_behind_latest which could be useful for determining application performance. *AsyncKinesisConsumer* has following configuration methods: *set_checkpoint_interval(records)* - how many records to skip before checkpointing *set_lock_duration(time)* - how many seconds to hold the lock. Consumer would attempt to refresh the lock before that time *set_reader_sleep_time(time)* - how long should shard reader wait (in seconds, fractions possible) if it did not receive any records from Kinesis stream *set_checkpoint_callback(coro)* - set callback coroutine to be called before checkpointing next batch of records. Coroutine arguments: *ShardId*, *SequenceNumber* Producer is rather trivial: ```python from async_kinesis_client.kinesis_producer import AsyncKinesisProducer # ... async def write_stream(): producer = AsyncKinesisProducer( stream_name='my-stream', ordered=True ) await producer.put_record( record=b'bytes', partition_key='string', # optional, if none, default time-based key is used explicit_hash_key='string' # optional ) ``` Sending multiple records at once: ```python from async_kinesis_client.kinesis_producer import AsyncKinesisProducer # ... async def write_stream(): producer = AsyncKinesisProducer( stream_name='my-stream', ordered=True ) records = [ { 'Data': b'bytes', 'PartitionKey': 'string', # optional, if none, default time-based key is used 'ExplicitHashKey': 'string' # optional }, ... ] response = await producer.put_records( records=records ) # See boto3 docs for response structure: # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records ``` AWS authentication. For testing outside AWS cloud, especially when Mutil-Factor Authentication is in use I find following snippet extremely useful: ```python import os import aioboto3 from botocore import credentials from aiobotocore import AioSession working_dir = os.path.join(os.path.expanduser('~'), '.aws/cli/cache') session = AioSession(profile=os.environ.get('AWS_PROFILE')) provider = session.get_component('credential_provider').get_provider('assume-role') provider.cache = credentials.JSONFileCache(working_dir) aioboto3.setup_default_session(botocore_session=session) ``` This allows re-using cached session token after completing any aws command under *awsudo*, all you need is to set AWS_PROFILE environment variable. Currently library still not tested enough for different network events. Use it on your own risk, you've been warned. %package help Summary: Development documents and examples for async-kinesis-client Provides: python3-async-kinesis-client-doc %description help # async-kinesis-client Python Kinesis Client library utilising asyncio Based on Kinesis-Python project by Evan Borgstrom https://github.com/NerdWalletOSS/kinesis-python but with asyncio magic The problem with Kinesis-Python is that all the data ends up in a single thread and being checkpointed from there - so despite having many processes, the client is clogged by checkpointing. Besides, it checkpoints every single record and this is not configurable. This client is based on aioboto3 library and uses Python 3.6+ async methods. Usage: ```python import asyncio from async_kinesis_client.kinesis_consumer import AsyncKinesisConsumer async def read_stream(): # This is a coroutine that reads all the records from a shard async def read_records(shard_reader): async for records in shard_reader.get_records(): for r in records: print('Shard: {}; Record: {}'.format(shard_reader.shard_id, r)) consumer = AsyncKinesisConsumer( stream_name='my-stream', checkpoint_table='my-checkpoint-table') # consumer will yield existing shards and will continue yielding # new shards if re-sharding happens async for shard_reader in consumer.get_shard_readers(): print('Got shard reader for shard id: {}'.format(shard_reader.shard_id)) asyncio.ensure_future(read_records(shard_reader)) asyncio.get_event_loop().run_until_complete(read_stream()) ``` *AsyncShardReader* and *AsyncKinesisConsumer* can be stopped from parallel coroutine by calling *stop()* method, consumer will stop all shard readers in that case. If you want to be notified of shard closing, catch *ShardClosedException* while reading records *AsyncShardReader* exposes property millis_behind_latest which could be useful for determining application performance. *AsyncKinesisConsumer* has following configuration methods: *set_checkpoint_interval(records)* - how many records to skip before checkpointing *set_lock_duration(time)* - how many seconds to hold the lock. Consumer would attempt to refresh the lock before that time *set_reader_sleep_time(time)* - how long should shard reader wait (in seconds, fractions possible) if it did not receive any records from Kinesis stream *set_checkpoint_callback(coro)* - set callback coroutine to be called before checkpointing next batch of records. Coroutine arguments: *ShardId*, *SequenceNumber* Producer is rather trivial: ```python from async_kinesis_client.kinesis_producer import AsyncKinesisProducer # ... async def write_stream(): producer = AsyncKinesisProducer( stream_name='my-stream', ordered=True ) await producer.put_record( record=b'bytes', partition_key='string', # optional, if none, default time-based key is used explicit_hash_key='string' # optional ) ``` Sending multiple records at once: ```python from async_kinesis_client.kinesis_producer import AsyncKinesisProducer # ... async def write_stream(): producer = AsyncKinesisProducer( stream_name='my-stream', ordered=True ) records = [ { 'Data': b'bytes', 'PartitionKey': 'string', # optional, if none, default time-based key is used 'ExplicitHashKey': 'string' # optional }, ... ] response = await producer.put_records( records=records ) # See boto3 docs for response structure: # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records ``` AWS authentication. For testing outside AWS cloud, especially when Mutil-Factor Authentication is in use I find following snippet extremely useful: ```python import os import aioboto3 from botocore import credentials from aiobotocore import AioSession working_dir = os.path.join(os.path.expanduser('~'), '.aws/cli/cache') session = AioSession(profile=os.environ.get('AWS_PROFILE')) provider = session.get_component('credential_provider').get_provider('assume-role') provider.cache = credentials.JSONFileCache(working_dir) aioboto3.setup_default_session(botocore_session=session) ``` This allows re-using cached session token after completing any aws command under *awsudo*, all you need is to set AWS_PROFILE environment variable. Currently library still not tested enough for different network events. Use it on your own risk, you've been warned. %prep %autosetup -n async-kinesis-client-0.2.14 %build %py3_build %install %py3_install install -d -m755 %{buildroot}/%{_pkgdocdir} if [ -d doc ]; then cp -arf doc %{buildroot}/%{_pkgdocdir}; fi if [ -d docs ]; then cp -arf docs %{buildroot}/%{_pkgdocdir}; fi if [ -d example ]; then cp -arf example %{buildroot}/%{_pkgdocdir}; fi if [ -d examples ]; then cp -arf examples %{buildroot}/%{_pkgdocdir}; fi pushd %{buildroot} if [ -d usr/lib ]; then find usr/lib -type f -printf "\"/%h/%f\"\n" >> filelist.lst fi if [ -d usr/lib64 ]; then find usr/lib64 -type f -printf "\"/%h/%f\"\n" >> filelist.lst fi if [ -d usr/bin ]; then find usr/bin -type f -printf "\"/%h/%f\"\n" >> filelist.lst fi if [ -d usr/sbin ]; then find usr/sbin -type f -printf "\"/%h/%f\"\n" >> filelist.lst fi touch doclist.lst if [ -d usr/share/man ]; then find usr/share/man -type f -printf "\"/%h/%f.gz\"\n" >> doclist.lst fi popd mv %{buildroot}/filelist.lst . mv %{buildroot}/doclist.lst . %files -n python3-async-kinesis-client -f filelist.lst %dir %{python3_sitelib}/* %files help -f doclist.lst %{_docdir}/* %changelog * Fri Jun 09 2023 Python_Bot - 0.2.14-1 - Package Spec generated