summaryrefslogtreecommitdiff
path: root/python-async-kinesis-client.spec
diff options
context:
space:
mode:
authorCoprDistGit <infra@openeuler.org>2023-05-31 05:26:09 +0000
committerCoprDistGit <infra@openeuler.org>2023-05-31 05:26:09 +0000
commite56923ad7c2f3494f716502f08958b0ab933d46d (patch)
tree9b18923e7aa8b0a4ae349aad6e3f203a9d84db1e /python-async-kinesis-client.spec
parentfb0eacb6d79a7b8c880d33e706d6084cd6d83167 (diff)
automatic import of python-async-kinesis-client
Diffstat (limited to 'python-async-kinesis-client.spec')
-rw-r--r--python-async-kinesis-client.spec464
1 files changed, 464 insertions, 0 deletions
diff --git a/python-async-kinesis-client.spec b/python-async-kinesis-client.spec
new file mode 100644
index 0000000..929510e
--- /dev/null
+++ b/python-async-kinesis-client.spec
@@ -0,0 +1,464 @@
+%global _empty_manifest_terminate_build 0
+Name: python-async-kinesis-client
+Version: 0.2.14
+Release: 1
+Summary: Asynchronous Python client for AWS Kinesis
+License: MIT
+URL: https://github.com/whale2/async-kinesis-client
+Source0: https://mirrors.nju.edu.cn/pypi/web/packages/b0/5e/e2d9b293c8cd76605beba210241b54badc8e33c1e46dd584a5681e0c0247/async-kinesis-client-0.2.14.tar.gz
+BuildArch: noarch
+
+Requires: python3-aioboto3
+Requires: python3-multidict
+
+%description
+# async-kinesis-client
+Python Kinesis Client library utilising asyncio
+
+Based on Kinesis-Python project by Evan Borgstrom <eborgstrom@nerdwallet.com>
+https://github.com/NerdWalletOSS/kinesis-python but with asyncio magic
+
+The problem with Kinesis-Python is that all the data ends up in a single thread
+and being checkpointed from there - so despite having many processes, the client
+is clogged by checkpointing. Besides, it checkpoints every single record and this is
+not configurable.
+
+This client is based on aioboto3 library and uses Python 3.6+ async methods.
+
+Usage:
+
+```python
+import asyncio
+from async_kinesis_client.kinesis_consumer import AsyncKinesisConsumer
+
+async def read_stream():
+
+ # This is a coroutine that reads all the records from a shard
+ async def read_records(shard_reader):
+ async for records in shard_reader.get_records():
+ for r in records:
+ print('Shard: {}; Record: {}'.format(shard_reader.shard_id, r))
+
+ consumer = AsyncKinesisConsumer(
+ stream_name='my-stream',
+ checkpoint_table='my-checkpoint-table')
+
+ # consumer will yield existing shards and will continue yielding
+ # new shards if re-sharding happens
+ async for shard_reader in consumer.get_shard_readers():
+ print('Got shard reader for shard id: {}'.format(shard_reader.shard_id))
+ asyncio.ensure_future(read_records(shard_reader))
+
+asyncio.get_event_loop().run_until_complete(read_stream())
+
+```
+
+*AsyncShardReader* and *AsyncKinesisConsumer* can be stopped from parallel coroutine by calling *stop()* method,
+consumer will stop all shard readers in that case.
+If you want to be notified of shard closing, catch *ShardClosedException* while reading records
+
+*AsyncShardReader* exposes property millis_behind_latest which could be useful for determining application performance.
+
+*AsyncKinesisConsumer* has following configuration methods:
+
+*set_checkpoint_interval(records)* - how many records to skip before checkpointing
+
+*set_lock_duration(time)* - how many seconds to hold the lock. Consumer would attempt to refresh the lock before that time
+
+*set_reader_sleep_time(time)* - how long should shard reader wait (in seconds, fractions possible) if it did not receive any records from Kinesis stream
+
+*set_checkpoint_callback(coro)* - set callback coroutine to be called before checkpointing next batch of records. Coroutine arguments: *ShardId*, *SequenceNumber*
+
+Producer is rather trivial:
+
+```python
+from async_kinesis_client.kinesis_producer import AsyncKinesisProducer
+
+# ...
+
+async def write_stream():
+ producer = AsyncKinesisProducer(
+ stream_name='my-stream',
+ ordered=True
+ )
+
+ await producer.put_record(
+ record=b'bytes',
+ partition_key='string', # optional, if none, default time-based key is used
+ explicit_hash_key='string' # optional
+ )
+
+```
+
+Sending multiple records at once:
+
+```python
+from async_kinesis_client.kinesis_producer import AsyncKinesisProducer
+
+# ...
+
+async def write_stream():
+ producer = AsyncKinesisProducer(
+ stream_name='my-stream',
+ ordered=True
+ )
+
+ records = [
+ {
+ 'Data': b'bytes',
+ 'PartitionKey': 'string', # optional, if none, default time-based key is used
+ 'ExplicitHashKey': 'string' # optional
+ },
+ ...
+ ]
+
+ response = await producer.put_records(
+ records=records
+ )
+
+ # See boto3 docs for response structure:
+ # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records
+```
+
+
+AWS authentication. For testing outside AWS cloud, especially when Mutil-Factor Authentication is in use I find following snippet extremely useful:
+```python
+import os
+import aioboto3
+from botocore import credentials
+from aiobotocore import AioSession
+
+ working_dir = os.path.join(os.path.expanduser('~'), '.aws/cli/cache')
+ session = AioSession(profile=os.environ.get('AWS_PROFILE'))
+ provider = session.get_component('credential_provider').get_provider('assume-role')
+ provider.cache = credentials.JSONFileCache(working_dir)
+ aioboto3.setup_default_session(botocore_session=session)
+
+```
+
+This allows re-using cached session token after completing any aws command under *awsudo*, all you need is to set AWS_PROFILE environment variable.
+
+Currently library still not tested enough for different network events.
+Use it on your own risk, you've been warned.
+
+
+
+
+%package -n python3-async-kinesis-client
+Summary: Asynchronous Python client for AWS Kinesis
+Provides: python-async-kinesis-client
+BuildRequires: python3-devel
+BuildRequires: python3-setuptools
+BuildRequires: python3-pip
+%description -n python3-async-kinesis-client
+# async-kinesis-client
+Python Kinesis Client library utilising asyncio
+
+Based on Kinesis-Python project by Evan Borgstrom <eborgstrom@nerdwallet.com>
+https://github.com/NerdWalletOSS/kinesis-python but with asyncio magic
+
+The problem with Kinesis-Python is that all the data ends up in a single thread
+and being checkpointed from there - so despite having many processes, the client
+is clogged by checkpointing. Besides, it checkpoints every single record and this is
+not configurable.
+
+This client is based on aioboto3 library and uses Python 3.6+ async methods.
+
+Usage:
+
+```python
+import asyncio
+from async_kinesis_client.kinesis_consumer import AsyncKinesisConsumer
+
+async def read_stream():
+
+ # This is a coroutine that reads all the records from a shard
+ async def read_records(shard_reader):
+ async for records in shard_reader.get_records():
+ for r in records:
+ print('Shard: {}; Record: {}'.format(shard_reader.shard_id, r))
+
+ consumer = AsyncKinesisConsumer(
+ stream_name='my-stream',
+ checkpoint_table='my-checkpoint-table')
+
+ # consumer will yield existing shards and will continue yielding
+ # new shards if re-sharding happens
+ async for shard_reader in consumer.get_shard_readers():
+ print('Got shard reader for shard id: {}'.format(shard_reader.shard_id))
+ asyncio.ensure_future(read_records(shard_reader))
+
+asyncio.get_event_loop().run_until_complete(read_stream())
+
+```
+
+*AsyncShardReader* and *AsyncKinesisConsumer* can be stopped from parallel coroutine by calling *stop()* method,
+consumer will stop all shard readers in that case.
+If you want to be notified of shard closing, catch *ShardClosedException* while reading records
+
+*AsyncShardReader* exposes property millis_behind_latest which could be useful for determining application performance.
+
+*AsyncKinesisConsumer* has following configuration methods:
+
+*set_checkpoint_interval(records)* - how many records to skip before checkpointing
+
+*set_lock_duration(time)* - how many seconds to hold the lock. Consumer would attempt to refresh the lock before that time
+
+*set_reader_sleep_time(time)* - how long should shard reader wait (in seconds, fractions possible) if it did not receive any records from Kinesis stream
+
+*set_checkpoint_callback(coro)* - set callback coroutine to be called before checkpointing next batch of records. Coroutine arguments: *ShardId*, *SequenceNumber*
+
+Producer is rather trivial:
+
+```python
+from async_kinesis_client.kinesis_producer import AsyncKinesisProducer
+
+# ...
+
+async def write_stream():
+ producer = AsyncKinesisProducer(
+ stream_name='my-stream',
+ ordered=True
+ )
+
+ await producer.put_record(
+ record=b'bytes',
+ partition_key='string', # optional, if none, default time-based key is used
+ explicit_hash_key='string' # optional
+ )
+
+```
+
+Sending multiple records at once:
+
+```python
+from async_kinesis_client.kinesis_producer import AsyncKinesisProducer
+
+# ...
+
+async def write_stream():
+ producer = AsyncKinesisProducer(
+ stream_name='my-stream',
+ ordered=True
+ )
+
+ records = [
+ {
+ 'Data': b'bytes',
+ 'PartitionKey': 'string', # optional, if none, default time-based key is used
+ 'ExplicitHashKey': 'string' # optional
+ },
+ ...
+ ]
+
+ response = await producer.put_records(
+ records=records
+ )
+
+ # See boto3 docs for response structure:
+ # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records
+```
+
+
+AWS authentication. For testing outside AWS cloud, especially when Mutil-Factor Authentication is in use I find following snippet extremely useful:
+```python
+import os
+import aioboto3
+from botocore import credentials
+from aiobotocore import AioSession
+
+ working_dir = os.path.join(os.path.expanduser('~'), '.aws/cli/cache')
+ session = AioSession(profile=os.environ.get('AWS_PROFILE'))
+ provider = session.get_component('credential_provider').get_provider('assume-role')
+ provider.cache = credentials.JSONFileCache(working_dir)
+ aioboto3.setup_default_session(botocore_session=session)
+
+```
+
+This allows re-using cached session token after completing any aws command under *awsudo*, all you need is to set AWS_PROFILE environment variable.
+
+Currently library still not tested enough for different network events.
+Use it on your own risk, you've been warned.
+
+
+
+
+%package help
+Summary: Development documents and examples for async-kinesis-client
+Provides: python3-async-kinesis-client-doc
+%description help
+# async-kinesis-client
+Python Kinesis Client library utilising asyncio
+
+Based on Kinesis-Python project by Evan Borgstrom <eborgstrom@nerdwallet.com>
+https://github.com/NerdWalletOSS/kinesis-python but with asyncio magic
+
+The problem with Kinesis-Python is that all the data ends up in a single thread
+and being checkpointed from there - so despite having many processes, the client
+is clogged by checkpointing. Besides, it checkpoints every single record and this is
+not configurable.
+
+This client is based on aioboto3 library and uses Python 3.6+ async methods.
+
+Usage:
+
+```python
+import asyncio
+from async_kinesis_client.kinesis_consumer import AsyncKinesisConsumer
+
+async def read_stream():
+
+ # This is a coroutine that reads all the records from a shard
+ async def read_records(shard_reader):
+ async for records in shard_reader.get_records():
+ for r in records:
+ print('Shard: {}; Record: {}'.format(shard_reader.shard_id, r))
+
+ consumer = AsyncKinesisConsumer(
+ stream_name='my-stream',
+ checkpoint_table='my-checkpoint-table')
+
+ # consumer will yield existing shards and will continue yielding
+ # new shards if re-sharding happens
+ async for shard_reader in consumer.get_shard_readers():
+ print('Got shard reader for shard id: {}'.format(shard_reader.shard_id))
+ asyncio.ensure_future(read_records(shard_reader))
+
+asyncio.get_event_loop().run_until_complete(read_stream())
+
+```
+
+*AsyncShardReader* and *AsyncKinesisConsumer* can be stopped from parallel coroutine by calling *stop()* method,
+consumer will stop all shard readers in that case.
+If you want to be notified of shard closing, catch *ShardClosedException* while reading records
+
+*AsyncShardReader* exposes property millis_behind_latest which could be useful for determining application performance.
+
+*AsyncKinesisConsumer* has following configuration methods:
+
+*set_checkpoint_interval(records)* - how many records to skip before checkpointing
+
+*set_lock_duration(time)* - how many seconds to hold the lock. Consumer would attempt to refresh the lock before that time
+
+*set_reader_sleep_time(time)* - how long should shard reader wait (in seconds, fractions possible) if it did not receive any records from Kinesis stream
+
+*set_checkpoint_callback(coro)* - set callback coroutine to be called before checkpointing next batch of records. Coroutine arguments: *ShardId*, *SequenceNumber*
+
+Producer is rather trivial:
+
+```python
+from async_kinesis_client.kinesis_producer import AsyncKinesisProducer
+
+# ...
+
+async def write_stream():
+ producer = AsyncKinesisProducer(
+ stream_name='my-stream',
+ ordered=True
+ )
+
+ await producer.put_record(
+ record=b'bytes',
+ partition_key='string', # optional, if none, default time-based key is used
+ explicit_hash_key='string' # optional
+ )
+
+```
+
+Sending multiple records at once:
+
+```python
+from async_kinesis_client.kinesis_producer import AsyncKinesisProducer
+
+# ...
+
+async def write_stream():
+ producer = AsyncKinesisProducer(
+ stream_name='my-stream',
+ ordered=True
+ )
+
+ records = [
+ {
+ 'Data': b'bytes',
+ 'PartitionKey': 'string', # optional, if none, default time-based key is used
+ 'ExplicitHashKey': 'string' # optional
+ },
+ ...
+ ]
+
+ response = await producer.put_records(
+ records=records
+ )
+
+ # See boto3 docs for response structure:
+ # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records
+```
+
+
+AWS authentication. For testing outside AWS cloud, especially when Mutil-Factor Authentication is in use I find following snippet extremely useful:
+```python
+import os
+import aioboto3
+from botocore import credentials
+from aiobotocore import AioSession
+
+ working_dir = os.path.join(os.path.expanduser('~'), '.aws/cli/cache')
+ session = AioSession(profile=os.environ.get('AWS_PROFILE'))
+ provider = session.get_component('credential_provider').get_provider('assume-role')
+ provider.cache = credentials.JSONFileCache(working_dir)
+ aioboto3.setup_default_session(botocore_session=session)
+
+```
+
+This allows re-using cached session token after completing any aws command under *awsudo*, all you need is to set AWS_PROFILE environment variable.
+
+Currently library still not tested enough for different network events.
+Use it on your own risk, you've been warned.
+
+
+
+
+%prep
+%autosetup -n async-kinesis-client-0.2.14
+
+%build
+%py3_build
+
+%install
+%py3_install
+install -d -m755 %{buildroot}/%{_pkgdocdir}
+if [ -d doc ]; then cp -arf doc %{buildroot}/%{_pkgdocdir}; fi
+if [ -d docs ]; then cp -arf docs %{buildroot}/%{_pkgdocdir}; fi
+if [ -d example ]; then cp -arf example %{buildroot}/%{_pkgdocdir}; fi
+if [ -d examples ]; then cp -arf examples %{buildroot}/%{_pkgdocdir}; fi
+pushd %{buildroot}
+if [ -d usr/lib ]; then
+ find usr/lib -type f -printf "/%h/%f\n" >> filelist.lst
+fi
+if [ -d usr/lib64 ]; then
+ find usr/lib64 -type f -printf "/%h/%f\n" >> filelist.lst
+fi
+if [ -d usr/bin ]; then
+ find usr/bin -type f -printf "/%h/%f\n" >> filelist.lst
+fi
+if [ -d usr/sbin ]; then
+ find usr/sbin -type f -printf "/%h/%f\n" >> filelist.lst
+fi
+touch doclist.lst
+if [ -d usr/share/man ]; then
+ find usr/share/man -type f -printf "/%h/%f.gz\n" >> doclist.lst
+fi
+popd
+mv %{buildroot}/filelist.lst .
+mv %{buildroot}/doclist.lst .
+
+%files -n python3-async-kinesis-client -f filelist.lst
+%dir %{python3_sitelib}/*
+
+%files help -f doclist.lst
+%{_docdir}/*
+
+%changelog
+* Wed May 31 2023 Python_Bot <Python_Bot@openeuler.org> - 0.2.14-1
+- Package Spec generated