%global _empty_manifest_terminate_build 0
Name: python-storey
Version: 1.3.15
Release: 1
Summary: Async flows
License: Apache
URL: https://github.com/mlrun/storey
Source0: https://mirrors.nju.edu.cn/pypi/web/packages/bc/2d/de49b705e44b582a7599beff4d51e2aa98984d27cff3ac38ea5315b4b265/storey-1.3.15.tar.gz
BuildArch: noarch
Requires: python3-aiohttp
Requires: python3-v3io
Requires: python3-pandas
Requires: python3-numpy
Requires: python3-pyarrow
Requires: python3-v3io-frames
Requires: python3-v3iofs
Requires: python3-xxhash
Requires: python3-adlfs
Requires: python3-kafka-python
Requires: python3-lupa
Requires: python3-redis
Requires: python3-s3fs
Requires: python3-sqlalchemy
%description
# Storey
[](https://github.com/mlrun/storey/actions?query=workflow%3ACI)
Storey is an asynchronous streaming library, for real time event processing and feature extraction.
#### In This Document
- [API Walkthrough](#api-walkthrough)
- [Usage Examples](#examples)
▶ For more information, see the [Storey Python package documentation](https://storey.readthedocs.io).
## API Walkthrough
A Storey flow consist of steps linked together by the `build_flow` function, each doing it's designated work.
### Supported Steps
#### Input Steps
* `SyncEmitSource`
* `AsyncEmitSource`
* `CSVSource`
* `ParquetSource`
* `DataframeSource`
#### Processing Steps
* `Filter`
* `Map`
* `FlatMap`
* `MapWithState`
* `Batch(max_events, timeout)` - Batches events. This step emits a batch every max_events events, or when timeout seconds have passed since the first event in the batch was received.
* `Choice`
* `JoinWithV3IOTable`
* `SendToHttp`
* `AggregateByKey(aggregations,cache, key=None, emit_policy=EmitEveryEvent(), augmentation_fn=None)` - This step aggregates the data into the cache object provided for later persistence, and outputs an event enriched with the requested aggregation features.
* `QueryByKey(features, cache, key=None, augmentation_fn=None, aliases=None)` - Similar to to `AggregateByKey`, but this step is for serving only and does not aggregate the event.
* `NoSqlTarget(table)` - Persists the data in `table` to its associated storage by key.
* `Extend`
* `JoinWithTable`
#### Output Steps
* `Complete`
* `Reduce`
* `StreamTarget`
* `CSVTarget`
* `ReduceToDataFrame`
* `TSDBTarget`
* `ParquetTarget`
## Usage Examples
### Using Aggregates
The following example reads user data, creates features using Storey's aggregates, persists the data to V3IO and emits events containing the features to a V3IO Stream for further processing.
```python
from storey import build_flow, SyncEmitSource, Table, V3ioDriver, AggregateByKey, FieldAggregator, NoSqlTarget
from storey.dtypes import SlidingWindows
v3io_web_api = "https://webapi.change-me.com"
v3io_acceess_key = "1284ne83-i262-46m6-9a23-810n41f169ea"
table_object = Table("/projects/my_features", V3ioDriver(v3io_web_api, v3io_acceess_key))
def enrich(event, state):
if "first_activity" not in state:
state["first_activity"] = event.time
event.body["time_since_activity"] = (event.body["time"] - state["first_activity"]).seconds
state["last_event"] = event.time
event.body["total_activities"] = state["total_activities"] = state.get("total_activities", 0) + 1
return event, state
controller = build_flow([
SyncEmitSource(),
MapWithState(table_object, enrich, group_by_key=True, full_event=True),
AggregateByKey([FieldAggregator("number_of_clicks", "click", ["count"],
SlidingWindows(["1h","2h", "24h"], "10m")),
FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
SlidingWindows(["1h","2h", "24h"], "10m")),
FieldAggregator("failed_activities", "activity", ["count"],
SlidingWindows(["1h"], "10m"),
aggr_filter=lambda element: element["activity_status"] == "fail"))],
table_object,
time_field="time"),
NoSqlTarget(table_object),
StreamTarget(V3ioDriver(v3io_web_api, v3io_acceess_key), "features_stream")
]).run()
```
We can also create a serving function, which sole purpose is to read data from the feature store and emit it further
```python
controller = build_flow([
SyncEmitSource(),
QueryAggregationByKey([FieldAggregator("number_of_clicks", "click", ["count"],
SlidingWindows(["1h","2h", "24h"], "10m")),
FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
SlidingWindows(["1h","2h", "24h"], "10m")),
FieldAggregator("failed_activities", "activity", ["count"],
SlidingWindows(["1h"], "10m"),
aggr_filter=lambda element: element["activity_status"] == "fail"))],
table_object,
time_field="time")
]).run()
```
%package -n python3-storey
Summary: Async flows
Provides: python-storey
BuildRequires: python3-devel
BuildRequires: python3-setuptools
BuildRequires: python3-pip
%description -n python3-storey
# Storey
[](https://github.com/mlrun/storey/actions?query=workflow%3ACI)
Storey is an asynchronous streaming library, for real time event processing and feature extraction.
#### In This Document
- [API Walkthrough](#api-walkthrough)
- [Usage Examples](#examples)
▶ For more information, see the [Storey Python package documentation](https://storey.readthedocs.io).
## API Walkthrough
A Storey flow consist of steps linked together by the `build_flow` function, each doing it's designated work.
### Supported Steps
#### Input Steps
* `SyncEmitSource`
* `AsyncEmitSource`
* `CSVSource`
* `ParquetSource`
* `DataframeSource`
#### Processing Steps
* `Filter`
* `Map`
* `FlatMap`
* `MapWithState`
* `Batch(max_events, timeout)` - Batches events. This step emits a batch every max_events events, or when timeout seconds have passed since the first event in the batch was received.
* `Choice`
* `JoinWithV3IOTable`
* `SendToHttp`
* `AggregateByKey(aggregations,cache, key=None, emit_policy=EmitEveryEvent(), augmentation_fn=None)` - This step aggregates the data into the cache object provided for later persistence, and outputs an event enriched with the requested aggregation features.
* `QueryByKey(features, cache, key=None, augmentation_fn=None, aliases=None)` - Similar to to `AggregateByKey`, but this step is for serving only and does not aggregate the event.
* `NoSqlTarget(table)` - Persists the data in `table` to its associated storage by key.
* `Extend`
* `JoinWithTable`
#### Output Steps
* `Complete`
* `Reduce`
* `StreamTarget`
* `CSVTarget`
* `ReduceToDataFrame`
* `TSDBTarget`
* `ParquetTarget`
## Usage Examples
### Using Aggregates
The following example reads user data, creates features using Storey's aggregates, persists the data to V3IO and emits events containing the features to a V3IO Stream for further processing.
```python
from storey import build_flow, SyncEmitSource, Table, V3ioDriver, AggregateByKey, FieldAggregator, NoSqlTarget
from storey.dtypes import SlidingWindows
v3io_web_api = "https://webapi.change-me.com"
v3io_acceess_key = "1284ne83-i262-46m6-9a23-810n41f169ea"
table_object = Table("/projects/my_features", V3ioDriver(v3io_web_api, v3io_acceess_key))
def enrich(event, state):
if "first_activity" not in state:
state["first_activity"] = event.time
event.body["time_since_activity"] = (event.body["time"] - state["first_activity"]).seconds
state["last_event"] = event.time
event.body["total_activities"] = state["total_activities"] = state.get("total_activities", 0) + 1
return event, state
controller = build_flow([
SyncEmitSource(),
MapWithState(table_object, enrich, group_by_key=True, full_event=True),
AggregateByKey([FieldAggregator("number_of_clicks", "click", ["count"],
SlidingWindows(["1h","2h", "24h"], "10m")),
FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
SlidingWindows(["1h","2h", "24h"], "10m")),
FieldAggregator("failed_activities", "activity", ["count"],
SlidingWindows(["1h"], "10m"),
aggr_filter=lambda element: element["activity_status"] == "fail"))],
table_object,
time_field="time"),
NoSqlTarget(table_object),
StreamTarget(V3ioDriver(v3io_web_api, v3io_acceess_key), "features_stream")
]).run()
```
We can also create a serving function, which sole purpose is to read data from the feature store and emit it further
```python
controller = build_flow([
SyncEmitSource(),
QueryAggregationByKey([FieldAggregator("number_of_clicks", "click", ["count"],
SlidingWindows(["1h","2h", "24h"], "10m")),
FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
SlidingWindows(["1h","2h", "24h"], "10m")),
FieldAggregator("failed_activities", "activity", ["count"],
SlidingWindows(["1h"], "10m"),
aggr_filter=lambda element: element["activity_status"] == "fail"))],
table_object,
time_field="time")
]).run()
```
%package help
Summary: Development documents and examples for storey
Provides: python3-storey-doc
%description help
# Storey
[](https://github.com/mlrun/storey/actions?query=workflow%3ACI)
Storey is an asynchronous streaming library, for real time event processing and feature extraction.
#### In This Document
- [API Walkthrough](#api-walkthrough)
- [Usage Examples](#examples)
▶ For more information, see the [Storey Python package documentation](https://storey.readthedocs.io).
## API Walkthrough
A Storey flow consist of steps linked together by the `build_flow` function, each doing it's designated work.
### Supported Steps
#### Input Steps
* `SyncEmitSource`
* `AsyncEmitSource`
* `CSVSource`
* `ParquetSource`
* `DataframeSource`
#### Processing Steps
* `Filter`
* `Map`
* `FlatMap`
* `MapWithState`
* `Batch(max_events, timeout)` - Batches events. This step emits a batch every max_events events, or when timeout seconds have passed since the first event in the batch was received.
* `Choice`
* `JoinWithV3IOTable`
* `SendToHttp`
* `AggregateByKey(aggregations,cache, key=None, emit_policy=EmitEveryEvent(), augmentation_fn=None)` - This step aggregates the data into the cache object provided for later persistence, and outputs an event enriched with the requested aggregation features.
* `QueryByKey(features, cache, key=None, augmentation_fn=None, aliases=None)` - Similar to to `AggregateByKey`, but this step is for serving only and does not aggregate the event.
* `NoSqlTarget(table)` - Persists the data in `table` to its associated storage by key.
* `Extend`
* `JoinWithTable`
#### Output Steps
* `Complete`
* `Reduce`
* `StreamTarget`
* `CSVTarget`
* `ReduceToDataFrame`
* `TSDBTarget`
* `ParquetTarget`
## Usage Examples
### Using Aggregates
The following example reads user data, creates features using Storey's aggregates, persists the data to V3IO and emits events containing the features to a V3IO Stream for further processing.
```python
from storey import build_flow, SyncEmitSource, Table, V3ioDriver, AggregateByKey, FieldAggregator, NoSqlTarget
from storey.dtypes import SlidingWindows
v3io_web_api = "https://webapi.change-me.com"
v3io_acceess_key = "1284ne83-i262-46m6-9a23-810n41f169ea"
table_object = Table("/projects/my_features", V3ioDriver(v3io_web_api, v3io_acceess_key))
def enrich(event, state):
if "first_activity" not in state:
state["first_activity"] = event.time
event.body["time_since_activity"] = (event.body["time"] - state["first_activity"]).seconds
state["last_event"] = event.time
event.body["total_activities"] = state["total_activities"] = state.get("total_activities", 0) + 1
return event, state
controller = build_flow([
SyncEmitSource(),
MapWithState(table_object, enrich, group_by_key=True, full_event=True),
AggregateByKey([FieldAggregator("number_of_clicks", "click", ["count"],
SlidingWindows(["1h","2h", "24h"], "10m")),
FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
SlidingWindows(["1h","2h", "24h"], "10m")),
FieldAggregator("failed_activities", "activity", ["count"],
SlidingWindows(["1h"], "10m"),
aggr_filter=lambda element: element["activity_status"] == "fail"))],
table_object,
time_field="time"),
NoSqlTarget(table_object),
StreamTarget(V3ioDriver(v3io_web_api, v3io_acceess_key), "features_stream")
]).run()
```
We can also create a serving function, which sole purpose is to read data from the feature store and emit it further
```python
controller = build_flow([
SyncEmitSource(),
QueryAggregationByKey([FieldAggregator("number_of_clicks", "click", ["count"],
SlidingWindows(["1h","2h", "24h"], "10m")),
FieldAggregator("purchases", "purchase_amount", ["avg", "min", "max"],
SlidingWindows(["1h","2h", "24h"], "10m")),
FieldAggregator("failed_activities", "activity", ["count"],
SlidingWindows(["1h"], "10m"),
aggr_filter=lambda element: element["activity_status"] == "fail"))],
table_object,
time_field="time")
]).run()
```
%prep
%autosetup -n storey-1.3.15
%build
%py3_build
%install
%py3_install
install -d -m755 %{buildroot}/%{_pkgdocdir}
if [ -d doc ]; then cp -arf doc %{buildroot}/%{_pkgdocdir}; fi
if [ -d docs ]; then cp -arf docs %{buildroot}/%{_pkgdocdir}; fi
if [ -d example ]; then cp -arf example %{buildroot}/%{_pkgdocdir}; fi
if [ -d examples ]; then cp -arf examples %{buildroot}/%{_pkgdocdir}; fi
pushd %{buildroot}
if [ -d usr/lib ]; then
find usr/lib -type f -printf "/%h/%f\n" >> filelist.lst
fi
if [ -d usr/lib64 ]; then
find usr/lib64 -type f -printf "/%h/%f\n" >> filelist.lst
fi
if [ -d usr/bin ]; then
find usr/bin -type f -printf "/%h/%f\n" >> filelist.lst
fi
if [ -d usr/sbin ]; then
find usr/sbin -type f -printf "/%h/%f\n" >> filelist.lst
fi
touch doclist.lst
if [ -d usr/share/man ]; then
find usr/share/man -type f -printf "/%h/%f.gz\n" >> doclist.lst
fi
popd
mv %{buildroot}/filelist.lst .
mv %{buildroot}/doclist.lst .
%files -n python3-storey -f filelist.lst
%dir %{python3_sitelib}/*
%files help -f doclist.lst
%{_docdir}/*
%changelog
* Wed Apr 12 2023 Python_Bot - 1.3.15-1
- Package Spec generated