%global _empty_manifest_terminate_build 0 Name: python-storey Version: 1.3.15 Release: 1 Summary: Async flows License: Apache URL: https://github.com/mlrun/storey Source0: https://mirrors.nju.edu.cn/pypi/web/packages/bc/2d/de49b705e44b582a7599beff4d51e2aa98984d27cff3ac38ea5315b4b265/storey-1.3.15.tar.gz BuildArch: noarch Requires: python3-aiohttp Requires: python3-v3io Requires: python3-pandas Requires: python3-numpy Requires: python3-pyarrow Requires: python3-v3io-frames Requires: python3-v3iofs Requires: python3-xxhash Requires: python3-adlfs Requires: python3-kafka-python Requires: python3-lupa Requires: python3-redis Requires: python3-s3fs Requires: python3-sqlalchemy %description # Storey [![CI](https://github.com/mlrun/storey/workflows/CI/badge.svg)](https://github.com/mlrun/storey/actions?query=workflow%3ACI) Storey is an asynchronous streaming library, for real time event processing and feature extraction. #### In This Document - [API Walkthrough](#api-walkthrough) - [Usage Examples](#examples) ▶ For more information, see the [Storey Python package documentation](https://storey.readthedocs.io). ## API Walkthrough A Storey flow consist of steps linked together by the `build_flow` function, each doing it's designated work. ### Supported Steps #### Input Steps * `SyncEmitSource` * `AsyncEmitSource` * `CSVSource` * `ParquetSource` * `DataframeSource` #### Processing Steps * `Filter` * `Map` * `FlatMap` * `MapWithState` * `Batch(max_events, timeout)` - Batches events. This step emits a batch every max_events events, or when timeout seconds have passed since the first event in the batch was received. * `Choice` * `JoinWithV3IOTable` * `SendToHttp` * `AggregateByKey(aggregations,cache, key=None, emit_policy=EmitEveryEvent(), augmentation_fn=None)` - This step aggregates the data into the cache object provided for later persistence, and outputs an event enriched with the requested aggregation features. * `QueryByKey(features, cache, key=None, augmentation_fn=None, aliases=None)` - Similar to to `AggregateByKey`, but this step is for serving only and does not aggregate the event. * `NoSqlTarget(table)` - Persists the data in `table` to its associated storage by key. * `Extend` * `JoinWithTable` #### Output Steps * `Complete` * `Reduce` * `StreamTarget` * `CSVTarget` * `ReduceToDataFrame` * `TSDBTarget` * `ParquetTarget` ## Usage Examples ### Using Aggregates The following example reads user data, creates features using Storey's aggregates, persists the data to V3IO and emits events containing the features to a V3IO Stream for further processing. ```python from storey import build_flow, SyncEmitSource, Table, V3ioDriver, AggregateByKey, FieldAggregator, NoSqlTarget from storey.dtypes import SlidingWindows v3io_web_api = "https://webapi.change-me.com" v3io_acceess_key = "1284ne83-i262-46m6-9a23-810n41f169ea" table_object = Table("/projects/my_features", V3ioDriver(v3io_web_api, v3io_acceess_key)) def enrich(event, state): if "first_activity" not in state: state["first_activity"] = event.time event.body["time_since_activity"] = (event.body["time"] - state["first_activity"]).seconds state["last_event"] = event.time event.body["total_activities"] = state["total_activities"] = state.get("total_activities", 0) + 1 return event, state controller = build_flow([ SyncEmitSource(), MapWithState(table_object, enrich, group_by_key=True, full_event=True), AggregateByKey([FieldAggregator("number_of_clicks", "click", ["count"], SlidingWindows(["1h","2h", "24h"], "10m")), FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"], SlidingWindows(["1h","2h", "24h"], "10m")), FieldAggregator("failed_activities", "activity", ["count"], SlidingWindows(["1h"], "10m"), aggr_filter=lambda element: element["activity_status"] == "fail"))], table_object, time_field="time"), NoSqlTarget(table_object), StreamTarget(V3ioDriver(v3io_web_api, v3io_acceess_key), "features_stream") ]).run() ``` We can also create a serving function, which sole purpose is to read data from the feature store and emit it further ```python controller = build_flow([ SyncEmitSource(), QueryAggregationByKey([FieldAggregator("number_of_clicks", "click", ["count"], SlidingWindows(["1h","2h", "24h"], "10m")), FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"], SlidingWindows(["1h","2h", "24h"], "10m")), FieldAggregator("failed_activities", "activity", ["count"], SlidingWindows(["1h"], "10m"), aggr_filter=lambda element: element["activity_status"] == "fail"))], table_object, time_field="time") ]).run() ``` %package -n python3-storey Summary: Async flows Provides: python-storey BuildRequires: python3-devel BuildRequires: python3-setuptools BuildRequires: python3-pip %description -n python3-storey # Storey [![CI](https://github.com/mlrun/storey/workflows/CI/badge.svg)](https://github.com/mlrun/storey/actions?query=workflow%3ACI) Storey is an asynchronous streaming library, for real time event processing and feature extraction. #### In This Document - [API Walkthrough](#api-walkthrough) - [Usage Examples](#examples) ▶ For more information, see the [Storey Python package documentation](https://storey.readthedocs.io). ## API Walkthrough A Storey flow consist of steps linked together by the `build_flow` function, each doing it's designated work. ### Supported Steps #### Input Steps * `SyncEmitSource` * `AsyncEmitSource` * `CSVSource` * `ParquetSource` * `DataframeSource` #### Processing Steps * `Filter` * `Map` * `FlatMap` * `MapWithState` * `Batch(max_events, timeout)` - Batches events. This step emits a batch every max_events events, or when timeout seconds have passed since the first event in the batch was received. * `Choice` * `JoinWithV3IOTable` * `SendToHttp` * `AggregateByKey(aggregations,cache, key=None, emit_policy=EmitEveryEvent(), augmentation_fn=None)` - This step aggregates the data into the cache object provided for later persistence, and outputs an event enriched with the requested aggregation features. * `QueryByKey(features, cache, key=None, augmentation_fn=None, aliases=None)` - Similar to to `AggregateByKey`, but this step is for serving only and does not aggregate the event. * `NoSqlTarget(table)` - Persists the data in `table` to its associated storage by key. * `Extend` * `JoinWithTable` #### Output Steps * `Complete` * `Reduce` * `StreamTarget` * `CSVTarget` * `ReduceToDataFrame` * `TSDBTarget` * `ParquetTarget` ## Usage Examples ### Using Aggregates The following example reads user data, creates features using Storey's aggregates, persists the data to V3IO and emits events containing the features to a V3IO Stream for further processing. ```python from storey import build_flow, SyncEmitSource, Table, V3ioDriver, AggregateByKey, FieldAggregator, NoSqlTarget from storey.dtypes import SlidingWindows v3io_web_api = "https://webapi.change-me.com" v3io_acceess_key = "1284ne83-i262-46m6-9a23-810n41f169ea" table_object = Table("/projects/my_features", V3ioDriver(v3io_web_api, v3io_acceess_key)) def enrich(event, state): if "first_activity" not in state: state["first_activity"] = event.time event.body["time_since_activity"] = (event.body["time"] - state["first_activity"]).seconds state["last_event"] = event.time event.body["total_activities"] = state["total_activities"] = state.get("total_activities", 0) + 1 return event, state controller = build_flow([ SyncEmitSource(), MapWithState(table_object, enrich, group_by_key=True, full_event=True), AggregateByKey([FieldAggregator("number_of_clicks", "click", ["count"], SlidingWindows(["1h","2h", "24h"], "10m")), FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"], SlidingWindows(["1h","2h", "24h"], "10m")), FieldAggregator("failed_activities", "activity", ["count"], SlidingWindows(["1h"], "10m"), aggr_filter=lambda element: element["activity_status"] == "fail"))], table_object, time_field="time"), NoSqlTarget(table_object), StreamTarget(V3ioDriver(v3io_web_api, v3io_acceess_key), "features_stream") ]).run() ``` We can also create a serving function, which sole purpose is to read data from the feature store and emit it further ```python controller = build_flow([ SyncEmitSource(), QueryAggregationByKey([FieldAggregator("number_of_clicks", "click", ["count"], SlidingWindows(["1h","2h", "24h"], "10m")), FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"], SlidingWindows(["1h","2h", "24h"], "10m")), FieldAggregator("failed_activities", "activity", ["count"], SlidingWindows(["1h"], "10m"), aggr_filter=lambda element: element["activity_status"] == "fail"))], table_object, time_field="time") ]).run() ``` %package help Summary: Development documents and examples for storey Provides: python3-storey-doc %description help # Storey [![CI](https://github.com/mlrun/storey/workflows/CI/badge.svg)](https://github.com/mlrun/storey/actions?query=workflow%3ACI) Storey is an asynchronous streaming library, for real time event processing and feature extraction. #### In This Document - [API Walkthrough](#api-walkthrough) - [Usage Examples](#examples) ▶ For more information, see the [Storey Python package documentation](https://storey.readthedocs.io). ## API Walkthrough A Storey flow consist of steps linked together by the `build_flow` function, each doing it's designated work. ### Supported Steps #### Input Steps * `SyncEmitSource` * `AsyncEmitSource` * `CSVSource` * `ParquetSource` * `DataframeSource` #### Processing Steps * `Filter` * `Map` * `FlatMap` * `MapWithState` * `Batch(max_events, timeout)` - Batches events. This step emits a batch every max_events events, or when timeout seconds have passed since the first event in the batch was received. * `Choice` * `JoinWithV3IOTable` * `SendToHttp` * `AggregateByKey(aggregations,cache, key=None, emit_policy=EmitEveryEvent(), augmentation_fn=None)` - This step aggregates the data into the cache object provided for later persistence, and outputs an event enriched with the requested aggregation features. * `QueryByKey(features, cache, key=None, augmentation_fn=None, aliases=None)` - Similar to to `AggregateByKey`, but this step is for serving only and does not aggregate the event. * `NoSqlTarget(table)` - Persists the data in `table` to its associated storage by key. * `Extend` * `JoinWithTable` #### Output Steps * `Complete` * `Reduce` * `StreamTarget` * `CSVTarget` * `ReduceToDataFrame` * `TSDBTarget` * `ParquetTarget` ## Usage Examples ### Using Aggregates The following example reads user data, creates features using Storey's aggregates, persists the data to V3IO and emits events containing the features to a V3IO Stream for further processing. ```python from storey import build_flow, SyncEmitSource, Table, V3ioDriver, AggregateByKey, FieldAggregator, NoSqlTarget from storey.dtypes import SlidingWindows v3io_web_api = "https://webapi.change-me.com" v3io_acceess_key = "1284ne83-i262-46m6-9a23-810n41f169ea" table_object = Table("/projects/my_features", V3ioDriver(v3io_web_api, v3io_acceess_key)) def enrich(event, state): if "first_activity" not in state: state["first_activity"] = event.time event.body["time_since_activity"] = (event.body["time"] - state["first_activity"]).seconds state["last_event"] = event.time event.body["total_activities"] = state["total_activities"] = state.get("total_activities", 0) + 1 return event, state controller = build_flow([ SyncEmitSource(), MapWithState(table_object, enrich, group_by_key=True, full_event=True), AggregateByKey([FieldAggregator("number_of_clicks", "click", ["count"], SlidingWindows(["1h","2h", "24h"], "10m")), FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"], SlidingWindows(["1h","2h", "24h"], "10m")), FieldAggregator("failed_activities", "activity", ["count"], SlidingWindows(["1h"], "10m"), aggr_filter=lambda element: element["activity_status"] == "fail"))], table_object, time_field="time"), NoSqlTarget(table_object), StreamTarget(V3ioDriver(v3io_web_api, v3io_acceess_key), "features_stream") ]).run() ``` We can also create a serving function, which sole purpose is to read data from the feature store and emit it further ```python controller = build_flow([ SyncEmitSource(), QueryAggregationByKey([FieldAggregator("number_of_clicks", "click", ["count"], SlidingWindows(["1h","2h", "24h"], "10m")), FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"], SlidingWindows(["1h","2h", "24h"], "10m")), FieldAggregator("failed_activities", "activity", ["count"], SlidingWindows(["1h"], "10m"), aggr_filter=lambda element: element["activity_status"] == "fail"))], table_object, time_field="time") ]).run() ``` %prep %autosetup -n storey-1.3.15 %build %py3_build %install %py3_install install -d -m755 %{buildroot}/%{_pkgdocdir} if [ -d doc ]; then cp -arf doc %{buildroot}/%{_pkgdocdir}; fi if [ -d docs ]; then cp -arf docs %{buildroot}/%{_pkgdocdir}; fi if [ -d example ]; then cp -arf example %{buildroot}/%{_pkgdocdir}; fi if [ -d examples ]; then cp -arf examples %{buildroot}/%{_pkgdocdir}; fi pushd %{buildroot} if [ -d usr/lib ]; then find usr/lib -type f -printf "/%h/%f\n" >> filelist.lst fi if [ -d usr/lib64 ]; then find usr/lib64 -type f -printf "/%h/%f\n" >> filelist.lst fi if [ -d usr/bin ]; then find usr/bin -type f -printf "/%h/%f\n" >> filelist.lst fi if [ -d usr/sbin ]; then find usr/sbin -type f -printf "/%h/%f\n" >> filelist.lst fi touch doclist.lst if [ -d usr/share/man ]; then find usr/share/man -type f -printf "/%h/%f.gz\n" >> doclist.lst fi popd mv %{buildroot}/filelist.lst . mv %{buildroot}/doclist.lst . %files -n python3-storey -f filelist.lst %dir %{python3_sitelib}/* %files help -f doclist.lst %{_docdir}/* %changelog * Wed Apr 12 2023 Python_Bot - 1.3.15-1 - Package Spec generated