diff options
author | CoprDistGit <infra@openeuler.org> | 2023-06-20 03:29:08 +0000 |
---|---|---|
committer | CoprDistGit <infra@openeuler.org> | 2023-06-20 03:29:08 +0000 |
commit | fb29b76b4d6ade1e78d60bbd86034e6699150d55 (patch) | |
tree | 8041afe3b68bb6f00f3dda3d14d66798a374a5aa | |
parent | b5df87ad45492709758f87460c5bc22a64e96774 (diff) |
automatic import of python-dataprimeopeneuler20.03
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | python-dataprime.spec | 1332 | ||||
-rw-r--r-- | sources | 1 |
3 files changed, 1334 insertions, 0 deletions
@@ -0,0 +1 @@ +/dataprime-0.1.9.tar.gz diff --git a/python-dataprime.spec b/python-dataprime.spec new file mode 100644 index 0000000..601e97d --- /dev/null +++ b/python-dataprime.spec @@ -0,0 +1,1332 @@ +%global _empty_manifest_terminate_build 0 +Name: python-dataprime +Version: 0.1.9 +Release: 1 +Summary: 数据智脑开源计算引擎dataprime +License: Mulan PSL v2 +URL: https://pypi.org/project/dataprime/ +Source0: https://mirrors.aliyun.com/pypi/web/packages/c5/48/ceb5cd698c52b72554722ef94e4c64909b030eb7b1001471ee1eb04ac478/dataprime-0.1.9.tar.gz +BuildArch: noarch + + +%description +## +# 数据智脑计算引擎dataprime + + +dataprime是一款数据计算引擎,提供了数据钻取、过滤、生成、采样、聚合、排序等功能,输入dataframe对象,调用不同方法,即可返回相应的结果, 并且可以结合celery进行异步调用 + + +## 一 快速开始 + +- 安装 + +`pip install dataprime` + +- 使用 +```python +import json + +from dataprime.dataprime import DataPrime +import pandas as pd + +# 准备数据 +data = [ + {"name": "test1", "time": "2021-01-05", "score": 10}, + {"name": "test2", "time": "2021-01-12", "score": 20}, + {"name": "test3", "time": "2021-01-24", "score": 30} +] + +# 过滤参数 +filter_list = [ + { + "column": "score", + "dtype": "int", + "filter": { + "condition": "greater_than", + "value": "10", + "max": 0, + "min": 0 + } + } +] + +data_frame = pd.read_json(json.dumps(data)) + +# 初始化一个DataPrime对象 +dp = DataPrime(dataframe=data_frame) + +# 调用方法进行处理, 返回一个dataframe对象 +result = dp.filter(filter_list).dataframe + +print(result) +``` + + +## 二 使用文档 +### 1.钻取 Drill +钻取是以一个时间字段为维度,将数据按年、月、日进行细分 + + + +- 参数字段 + +`drill_granularity_list` 钻取粒度列表,包含年、月、日 +`drill_node_list` 钻取节点列表 +`dimensions` 钻取字段 + + + +- 参数示例: +```python +drill_granularity_list = [ + { + "value": "YEAR", + "label": "年" + }, + { + "value": "MONTH", + "label": "月" + }, + { + "value": "DAY", + "label": "日" + } +] + +drill_node_list = [ + { + "granularity": "", + "point": "YEAR", + }, + { + "granularity": "YEAR", + "point": "2021", + }, + { + "granularity": "MONTH", + "point": "01", + } +] + +dimensions = [ + { + "column": "time", + "dtype": "datetime", + } +] +``` + + +- 使用示例 +```python +result = dp.drill(drill_granularity_list, drill_node_list, dimensions).dataframe +``` + + +### 2.过滤 Filter +过滤是根据某一个度量字段,通过一定条件来筛选需要的数据 + + +- 参数示例 +```python +filter_list = [ + { + "column": "score", # 过滤字段 + "dtype": "int", # 字段类型 + "filter": { + "condition": "less_than", # 过滤条件 + "value": "100", # 过滤值 + "max": 50, # 最大值, 区间判断时用到 + "min": 0 # 最小值, 区间判断时用到 + } + } +] +``` + + +- 使用示例 +```python +result = dp.filter(filter_list).dataframe +``` + + +- condition选项 + +| **条件** | **值** | +| --- | --- | +| 等于 | equal | +| 不等于 | not_equal | +| 区间内 | in_between | +| 区间外 | out_between | +| 大于 | greater_than | +| 大于等于 | equal_greater_than | +| 小于 | less_than | +| 小于等于 | equal_less_than | +| 包含 | contains | +| 不包含 | not_contains | + + + +### 3.生成器 Generator +生成器是在dataframe中新增一列,提供求和、求平均值、计算行等结果,例如有a、b两列,可以通过生成器生成"a-b-平均值"列 + + + +- 参数示例 +```python +generated_metric_list = [ + { + "dtype": "float", + "operate": { + "label": "均值", + "value": "Avg" # 生成类型 + }, + "source_col_list": [ # 源数据列 + { + "column": "age", + "dtype": "int" + }, + { + "column": "score", + "dtype": "int" + } + ], + "column": "age-score-均值" # 生成列名称 + } +] +``` + +- 使用示例 +```python +result = dp.generate(generated_metric_list).dataframe +``` + +- 生成类型 + +| **生成类型** | **值value** | +| --- | --- | +| 平均值 | Avg | +| 求和 | Sum | +| 计算行 | COUNT_COL | + + + +### 4.采样 Sampling +采样是指通过`等距采样`、`随机采样`等方式,对数据进行采样,获取一定数量的行 + + + +- 参数示例 +```python +sample = { + "number": 5, # 采样数量 + "type": "random" # 采样方式, 随机:random 等距:equidistant +} +``` + +- 使用示例 +```python +result = dp.sample(5, "random").dataframe +``` + + +### 5.聚合 Aggregate +聚合是对数值(度量)列进行聚合,可以进行计数、求和、求平均值等聚合操作 + + + +- 参数示例 +```python +aggregation_list = [ + { + "column": "age", # 源数据列 + "dtype": "int", + "aggregation_option": { + "label": "均值", + "value": "mean" # 聚合方法 + } + }, + { + "column": "score", + "dtype": "int", + "aggregation_option": { + "label": "均值", + "value": "mean" + } + } +] +``` + +- 可用的聚合方法 + +['count', 'sum', 'mean', 'median', 'std', 'var', 'max', 'min'] + + + +- 使用示例 +```python +result = dp.aggregate(aggregation_list).dataframe +``` + + +### 6.排序 Sorting +排序是根据某一个数据列进行升降排序 + + + +- 参数示例 +```python +sort = { + "condition": "desc", # 升序:desc 降序:asc + "column": "time" # 排序字段 +} +``` + +- 使用示例 +```python +result = dp.order_by("time", "desc").dataframe +``` + + +### 7.top N +top N是指按照某一列的值进行排序,取出前几行 + + + +- 参数示例 +```python +top_n_param = { + "column": "score", # 排序字段 + "condition": "desc", # 排序方式, 升序:desc 降序:asc + "limit": 3 # 取出数量 +} +``` + + +- 使用示例 +```python +result = dp.top_n("score", "desc", 3).dataframe +``` +### +### 8.统一入口 +除了上述的单个方法外,也可以使用`dataprime.process_df_operator()`来同时进行多个数据处理操作,参数字段与上面相同 + + + +- 使用示例 +```python +dataprime = DataPrime( + dataframe=data_frame, + **{ + "dimensions": dimension_list, + "metrics": metric_list, + "calculation_flow_nodes": calculation_flow_nodes, + "generated_metric_list": generated_metric_list, + "filter_list": filter_list, + "aggregation_list": aggregation_list, + "sample": sample, + "drill_granularity_list": drill_granularity_list, + "drill_node_list": drill_node_list, + "sort": sort_param, + "topn": top_n_param + } +) + +result = dataprime.process_df_operator() +``` +- calculation_flow_nodes为计算节点列表, 可选值有: + + `DRILLING` 钻取 + + `SAMPLING` 采样 + + `FILTER` 过滤 + + `AGGREGATION` 聚合 + + 例如需要钻取、采样两种计算,则`calculation_flow_nodes=["DRILLING", "SAMPLING"]`, 同时也需要传递这2个计算节点对应的参数`drill_node_list` `drill_granularity_list` `filter_list`. 使用多个计算节点时, 按照列表中元素的先后顺序来依次调用 + +### 9.链式调用 +上述的单个数据处理方法可以进行链式调用,如: + +`result = dp.order_by("time", "desc").sample(5, "random").dataframe` + +## 三 异步调用 +`dataprime`可以结合`celery`进行异步调用, 主要通过`AsyncDataprime`提供的接口来调用, 其方法名称与同步调用时一致, 返回结果为celery任务的id,使用示例如下: + +- 初始化celery工程文件 + + 执行以下代码 + ```python + from dataprime.dataprime import DataPrime + + dp = DataPrime() + dp.init_celery() + ``` + 将会在当前目录下生成 `dataprime_celery`文件夹, 包含以下文件: + + `celery_app.py` celery主入口文件 + + `config.py` celery配置文件, 需要手动配置broker_url和result_backend + + `task.py` celery任务文件 + +- celery的其余使用配置参考其官方文档: + + https://docs.celeryproject.org/en/stable/ + +- 启动celery worker + + 配置好celery的broker, backend等配置之后,执行以下命令启动 + + `celery -A dataprime_celery.celery_app worker -l INFO` + +- 发送异步任务请求 + +```python +from async_dataprime.executor import AsyncDataPrime + +# 初始化 +dataframe="A dataframe object" +adp = AsyncDataPrime() + +filter_list = [ + { + "column": "score", + "dtype": "int", + "filter": { + "condition": "greater_than", + "value": "10", + "max": 0, + "min": 0 + } + } +] + +# 调用方法进行处理, 返回异步任务的任务id +# 异步任务的方法和同步调用时的方式名称一致,调用对象换成了AsyncDataPrime对象 +task_id = adp.filter(dataframe=dataframe, filter_list=filter_list) +print(task_id) +``` + +- 获取结果 + + 异步任务的结果是以json的格式存储在事先配置的redis backend中, 通过task_id即可获取: +```python +# 执行任务 +adp = AsyncDataPrime() +task_id = adp.filter(data_frame, filter_list) + +# 等待任务完成 + +# 获取结果, datatype可选"json"或"dataframe" +res = adp.get_async_result(task_id, data_type="dataframe") +print(res) +``` + + +## 四 开源协议 + + 本项目采用 `木兰宽松许可证` + + http://license.coscl.org.cn/MulanPSL2/ + +%package -n python3-dataprime +Summary: 数据智脑开源计算引擎dataprime +Provides: python-dataprime +BuildRequires: python3-devel +BuildRequires: python3-setuptools +BuildRequires: python3-pip +%description -n python3-dataprime +## +# 数据智脑计算引擎dataprime + + +dataprime是一款数据计算引擎,提供了数据钻取、过滤、生成、采样、聚合、排序等功能,输入dataframe对象,调用不同方法,即可返回相应的结果, 并且可以结合celery进行异步调用 + + +## 一 快速开始 + +- 安装 + +`pip install dataprime` + +- 使用 +```python +import json + +from dataprime.dataprime import DataPrime +import pandas as pd + +# 准备数据 +data = [ + {"name": "test1", "time": "2021-01-05", "score": 10}, + {"name": "test2", "time": "2021-01-12", "score": 20}, + {"name": "test3", "time": "2021-01-24", "score": 30} +] + +# 过滤参数 +filter_list = [ + { + "column": "score", + "dtype": "int", + "filter": { + "condition": "greater_than", + "value": "10", + "max": 0, + "min": 0 + } + } +] + +data_frame = pd.read_json(json.dumps(data)) + +# 初始化一个DataPrime对象 +dp = DataPrime(dataframe=data_frame) + +# 调用方法进行处理, 返回一个dataframe对象 +result = dp.filter(filter_list).dataframe + +print(result) +``` + + +## 二 使用文档 +### 1.钻取 Drill +钻取是以一个时间字段为维度,将数据按年、月、日进行细分 + + + +- 参数字段 + +`drill_granularity_list` 钻取粒度列表,包含年、月、日 +`drill_node_list` 钻取节点列表 +`dimensions` 钻取字段 + + + +- 参数示例: +```python +drill_granularity_list = [ + { + "value": "YEAR", + "label": "年" + }, + { + "value": "MONTH", + "label": "月" + }, + { + "value": "DAY", + "label": "日" + } +] + +drill_node_list = [ + { + "granularity": "", + "point": "YEAR", + }, + { + "granularity": "YEAR", + "point": "2021", + }, + { + "granularity": "MONTH", + "point": "01", + } +] + +dimensions = [ + { + "column": "time", + "dtype": "datetime", + } +] +``` + + +- 使用示例 +```python +result = dp.drill(drill_granularity_list, drill_node_list, dimensions).dataframe +``` + + +### 2.过滤 Filter +过滤是根据某一个度量字段,通过一定条件来筛选需要的数据 + + +- 参数示例 +```python +filter_list = [ + { + "column": "score", # 过滤字段 + "dtype": "int", # 字段类型 + "filter": { + "condition": "less_than", # 过滤条件 + "value": "100", # 过滤值 + "max": 50, # 最大值, 区间判断时用到 + "min": 0 # 最小值, 区间判断时用到 + } + } +] +``` + + +- 使用示例 +```python +result = dp.filter(filter_list).dataframe +``` + + +- condition选项 + +| **条件** | **值** | +| --- | --- | +| 等于 | equal | +| 不等于 | not_equal | +| 区间内 | in_between | +| 区间外 | out_between | +| 大于 | greater_than | +| 大于等于 | equal_greater_than | +| 小于 | less_than | +| 小于等于 | equal_less_than | +| 包含 | contains | +| 不包含 | not_contains | + + + +### 3.生成器 Generator +生成器是在dataframe中新增一列,提供求和、求平均值、计算行等结果,例如有a、b两列,可以通过生成器生成"a-b-平均值"列 + + + +- 参数示例 +```python +generated_metric_list = [ + { + "dtype": "float", + "operate": { + "label": "均值", + "value": "Avg" # 生成类型 + }, + "source_col_list": [ # 源数据列 + { + "column": "age", + "dtype": "int" + }, + { + "column": "score", + "dtype": "int" + } + ], + "column": "age-score-均值" # 生成列名称 + } +] +``` + +- 使用示例 +```python +result = dp.generate(generated_metric_list).dataframe +``` + +- 生成类型 + +| **生成类型** | **值value** | +| --- | --- | +| 平均值 | Avg | +| 求和 | Sum | +| 计算行 | COUNT_COL | + + + +### 4.采样 Sampling +采样是指通过`等距采样`、`随机采样`等方式,对数据进行采样,获取一定数量的行 + + + +- 参数示例 +```python +sample = { + "number": 5, # 采样数量 + "type": "random" # 采样方式, 随机:random 等距:equidistant +} +``` + +- 使用示例 +```python +result = dp.sample(5, "random").dataframe +``` + + +### 5.聚合 Aggregate +聚合是对数值(度量)列进行聚合,可以进行计数、求和、求平均值等聚合操作 + + + +- 参数示例 +```python +aggregation_list = [ + { + "column": "age", # 源数据列 + "dtype": "int", + "aggregation_option": { + "label": "均值", + "value": "mean" # 聚合方法 + } + }, + { + "column": "score", + "dtype": "int", + "aggregation_option": { + "label": "均值", + "value": "mean" + } + } +] +``` + +- 可用的聚合方法 + +['count', 'sum', 'mean', 'median', 'std', 'var', 'max', 'min'] + + + +- 使用示例 +```python +result = dp.aggregate(aggregation_list).dataframe +``` + + +### 6.排序 Sorting +排序是根据某一个数据列进行升降排序 + + + +- 参数示例 +```python +sort = { + "condition": "desc", # 升序:desc 降序:asc + "column": "time" # 排序字段 +} +``` + +- 使用示例 +```python +result = dp.order_by("time", "desc").dataframe +``` + + +### 7.top N +top N是指按照某一列的值进行排序,取出前几行 + + + +- 参数示例 +```python +top_n_param = { + "column": "score", # 排序字段 + "condition": "desc", # 排序方式, 升序:desc 降序:asc + "limit": 3 # 取出数量 +} +``` + + +- 使用示例 +```python +result = dp.top_n("score", "desc", 3).dataframe +``` +### +### 8.统一入口 +除了上述的单个方法外,也可以使用`dataprime.process_df_operator()`来同时进行多个数据处理操作,参数字段与上面相同 + + + +- 使用示例 +```python +dataprime = DataPrime( + dataframe=data_frame, + **{ + "dimensions": dimension_list, + "metrics": metric_list, + "calculation_flow_nodes": calculation_flow_nodes, + "generated_metric_list": generated_metric_list, + "filter_list": filter_list, + "aggregation_list": aggregation_list, + "sample": sample, + "drill_granularity_list": drill_granularity_list, + "drill_node_list": drill_node_list, + "sort": sort_param, + "topn": top_n_param + } +) + +result = dataprime.process_df_operator() +``` +- calculation_flow_nodes为计算节点列表, 可选值有: + + `DRILLING` 钻取 + + `SAMPLING` 采样 + + `FILTER` 过滤 + + `AGGREGATION` 聚合 + + 例如需要钻取、采样两种计算,则`calculation_flow_nodes=["DRILLING", "SAMPLING"]`, 同时也需要传递这2个计算节点对应的参数`drill_node_list` `drill_granularity_list` `filter_list`. 使用多个计算节点时, 按照列表中元素的先后顺序来依次调用 + +### 9.链式调用 +上述的单个数据处理方法可以进行链式调用,如: + +`result = dp.order_by("time", "desc").sample(5, "random").dataframe` + +## 三 异步调用 +`dataprime`可以结合`celery`进行异步调用, 主要通过`AsyncDataprime`提供的接口来调用, 其方法名称与同步调用时一致, 返回结果为celery任务的id,使用示例如下: + +- 初始化celery工程文件 + + 执行以下代码 + ```python + from dataprime.dataprime import DataPrime + + dp = DataPrime() + dp.init_celery() + ``` + 将会在当前目录下生成 `dataprime_celery`文件夹, 包含以下文件: + + `celery_app.py` celery主入口文件 + + `config.py` celery配置文件, 需要手动配置broker_url和result_backend + + `task.py` celery任务文件 + +- celery的其余使用配置参考其官方文档: + + https://docs.celeryproject.org/en/stable/ + +- 启动celery worker + + 配置好celery的broker, backend等配置之后,执行以下命令启动 + + `celery -A dataprime_celery.celery_app worker -l INFO` + +- 发送异步任务请求 + +```python +from async_dataprime.executor import AsyncDataPrime + +# 初始化 +dataframe="A dataframe object" +adp = AsyncDataPrime() + +filter_list = [ + { + "column": "score", + "dtype": "int", + "filter": { + "condition": "greater_than", + "value": "10", + "max": 0, + "min": 0 + } + } +] + +# 调用方法进行处理, 返回异步任务的任务id +# 异步任务的方法和同步调用时的方式名称一致,调用对象换成了AsyncDataPrime对象 +task_id = adp.filter(dataframe=dataframe, filter_list=filter_list) +print(task_id) +``` + +- 获取结果 + + 异步任务的结果是以json的格式存储在事先配置的redis backend中, 通过task_id即可获取: +```python +# 执行任务 +adp = AsyncDataPrime() +task_id = adp.filter(data_frame, filter_list) + +# 等待任务完成 + +# 获取结果, datatype可选"json"或"dataframe" +res = adp.get_async_result(task_id, data_type="dataframe") +print(res) +``` + + +## 四 开源协议 + + 本项目采用 `木兰宽松许可证` + + http://license.coscl.org.cn/MulanPSL2/ + +%package help +Summary: Development documents and examples for dataprime +Provides: python3-dataprime-doc +%description help +## +# 数据智脑计算引擎dataprime + + +dataprime是一款数据计算引擎,提供了数据钻取、过滤、生成、采样、聚合、排序等功能,输入dataframe对象,调用不同方法,即可返回相应的结果, 并且可以结合celery进行异步调用 + + +## 一 快速开始 + +- 安装 + +`pip install dataprime` + +- 使用 +```python +import json + +from dataprime.dataprime import DataPrime +import pandas as pd + +# 准备数据 +data = [ + {"name": "test1", "time": "2021-01-05", "score": 10}, + {"name": "test2", "time": "2021-01-12", "score": 20}, + {"name": "test3", "time": "2021-01-24", "score": 30} +] + +# 过滤参数 +filter_list = [ + { + "column": "score", + "dtype": "int", + "filter": { + "condition": "greater_than", + "value": "10", + "max": 0, + "min": 0 + } + } +] + +data_frame = pd.read_json(json.dumps(data)) + +# 初始化一个DataPrime对象 +dp = DataPrime(dataframe=data_frame) + +# 调用方法进行处理, 返回一个dataframe对象 +result = dp.filter(filter_list).dataframe + +print(result) +``` + + +## 二 使用文档 +### 1.钻取 Drill +钻取是以一个时间字段为维度,将数据按年、月、日进行细分 + + + +- 参数字段 + +`drill_granularity_list` 钻取粒度列表,包含年、月、日 +`drill_node_list` 钻取节点列表 +`dimensions` 钻取字段 + + + +- 参数示例: +```python +drill_granularity_list = [ + { + "value": "YEAR", + "label": "年" + }, + { + "value": "MONTH", + "label": "月" + }, + { + "value": "DAY", + "label": "日" + } +] + +drill_node_list = [ + { + "granularity": "", + "point": "YEAR", + }, + { + "granularity": "YEAR", + "point": "2021", + }, + { + "granularity": "MONTH", + "point": "01", + } +] + +dimensions = [ + { + "column": "time", + "dtype": "datetime", + } +] +``` + + +- 使用示例 +```python +result = dp.drill(drill_granularity_list, drill_node_list, dimensions).dataframe +``` + + +### 2.过滤 Filter +过滤是根据某一个度量字段,通过一定条件来筛选需要的数据 + + +- 参数示例 +```python +filter_list = [ + { + "column": "score", # 过滤字段 + "dtype": "int", # 字段类型 + "filter": { + "condition": "less_than", # 过滤条件 + "value": "100", # 过滤值 + "max": 50, # 最大值, 区间判断时用到 + "min": 0 # 最小值, 区间判断时用到 + } + } +] +``` + + +- 使用示例 +```python +result = dp.filter(filter_list).dataframe +``` + + +- condition选项 + +| **条件** | **值** | +| --- | --- | +| 等于 | equal | +| 不等于 | not_equal | +| 区间内 | in_between | +| 区间外 | out_between | +| 大于 | greater_than | +| 大于等于 | equal_greater_than | +| 小于 | less_than | +| 小于等于 | equal_less_than | +| 包含 | contains | +| 不包含 | not_contains | + + + +### 3.生成器 Generator +生成器是在dataframe中新增一列,提供求和、求平均值、计算行等结果,例如有a、b两列,可以通过生成器生成"a-b-平均值"列 + + + +- 参数示例 +```python +generated_metric_list = [ + { + "dtype": "float", + "operate": { + "label": "均值", + "value": "Avg" # 生成类型 + }, + "source_col_list": [ # 源数据列 + { + "column": "age", + "dtype": "int" + }, + { + "column": "score", + "dtype": "int" + } + ], + "column": "age-score-均值" # 生成列名称 + } +] +``` + +- 使用示例 +```python +result = dp.generate(generated_metric_list).dataframe +``` + +- 生成类型 + +| **生成类型** | **值value** | +| --- | --- | +| 平均值 | Avg | +| 求和 | Sum | +| 计算行 | COUNT_COL | + + + +### 4.采样 Sampling +采样是指通过`等距采样`、`随机采样`等方式,对数据进行采样,获取一定数量的行 + + + +- 参数示例 +```python +sample = { + "number": 5, # 采样数量 + "type": "random" # 采样方式, 随机:random 等距:equidistant +} +``` + +- 使用示例 +```python +result = dp.sample(5, "random").dataframe +``` + + +### 5.聚合 Aggregate +聚合是对数值(度量)列进行聚合,可以进行计数、求和、求平均值等聚合操作 + + + +- 参数示例 +```python +aggregation_list = [ + { + "column": "age", # 源数据列 + "dtype": "int", + "aggregation_option": { + "label": "均值", + "value": "mean" # 聚合方法 + } + }, + { + "column": "score", + "dtype": "int", + "aggregation_option": { + "label": "均值", + "value": "mean" + } + } +] +``` + +- 可用的聚合方法 + +['count', 'sum', 'mean', 'median', 'std', 'var', 'max', 'min'] + + + +- 使用示例 +```python +result = dp.aggregate(aggregation_list).dataframe +``` + + +### 6.排序 Sorting +排序是根据某一个数据列进行升降排序 + + + +- 参数示例 +```python +sort = { + "condition": "desc", # 升序:desc 降序:asc + "column": "time" # 排序字段 +} +``` + +- 使用示例 +```python +result = dp.order_by("time", "desc").dataframe +``` + + +### 7.top N +top N是指按照某一列的值进行排序,取出前几行 + + + +- 参数示例 +```python +top_n_param = { + "column": "score", # 排序字段 + "condition": "desc", # 排序方式, 升序:desc 降序:asc + "limit": 3 # 取出数量 +} +``` + + +- 使用示例 +```python +result = dp.top_n("score", "desc", 3).dataframe +``` +### +### 8.统一入口 +除了上述的单个方法外,也可以使用`dataprime.process_df_operator()`来同时进行多个数据处理操作,参数字段与上面相同 + + + +- 使用示例 +```python +dataprime = DataPrime( + dataframe=data_frame, + **{ + "dimensions": dimension_list, + "metrics": metric_list, + "calculation_flow_nodes": calculation_flow_nodes, + "generated_metric_list": generated_metric_list, + "filter_list": filter_list, + "aggregation_list": aggregation_list, + "sample": sample, + "drill_granularity_list": drill_granularity_list, + "drill_node_list": drill_node_list, + "sort": sort_param, + "topn": top_n_param + } +) + +result = dataprime.process_df_operator() +``` +- calculation_flow_nodes为计算节点列表, 可选值有: + + `DRILLING` 钻取 + + `SAMPLING` 采样 + + `FILTER` 过滤 + + `AGGREGATION` 聚合 + + 例如需要钻取、采样两种计算,则`calculation_flow_nodes=["DRILLING", "SAMPLING"]`, 同时也需要传递这2个计算节点对应的参数`drill_node_list` `drill_granularity_list` `filter_list`. 使用多个计算节点时, 按照列表中元素的先后顺序来依次调用 + +### 9.链式调用 +上述的单个数据处理方法可以进行链式调用,如: + +`result = dp.order_by("time", "desc").sample(5, "random").dataframe` + +## 三 异步调用 +`dataprime`可以结合`celery`进行异步调用, 主要通过`AsyncDataprime`提供的接口来调用, 其方法名称与同步调用时一致, 返回结果为celery任务的id,使用示例如下: + +- 初始化celery工程文件 + + 执行以下代码 + ```python + from dataprime.dataprime import DataPrime + + dp = DataPrime() + dp.init_celery() + ``` + 将会在当前目录下生成 `dataprime_celery`文件夹, 包含以下文件: + + `celery_app.py` celery主入口文件 + + `config.py` celery配置文件, 需要手动配置broker_url和result_backend + + `task.py` celery任务文件 + +- celery的其余使用配置参考其官方文档: + + https://docs.celeryproject.org/en/stable/ + +- 启动celery worker + + 配置好celery的broker, backend等配置之后,执行以下命令启动 + + `celery -A dataprime_celery.celery_app worker -l INFO` + +- 发送异步任务请求 + +```python +from async_dataprime.executor import AsyncDataPrime + +# 初始化 +dataframe="A dataframe object" +adp = AsyncDataPrime() + +filter_list = [ + { + "column": "score", + "dtype": "int", + "filter": { + "condition": "greater_than", + "value": "10", + "max": 0, + "min": 0 + } + } +] + +# 调用方法进行处理, 返回异步任务的任务id +# 异步任务的方法和同步调用时的方式名称一致,调用对象换成了AsyncDataPrime对象 +task_id = adp.filter(dataframe=dataframe, filter_list=filter_list) +print(task_id) +``` + +- 获取结果 + + 异步任务的结果是以json的格式存储在事先配置的redis backend中, 通过task_id即可获取: +```python +# 执行任务 +adp = AsyncDataPrime() +task_id = adp.filter(data_frame, filter_list) + +# 等待任务完成 + +# 获取结果, datatype可选"json"或"dataframe" +res = adp.get_async_result(task_id, data_type="dataframe") +print(res) +``` + + +## 四 开源协议 + + 本项目采用 `木兰宽松许可证` + + http://license.coscl.org.cn/MulanPSL2/ + +%prep +%autosetup -n dataprime-0.1.9 + +%build +%py3_build + +%install +%py3_install +install -d -m755 %{buildroot}/%{_pkgdocdir} +if [ -d doc ]; then cp -arf doc %{buildroot}/%{_pkgdocdir}; fi +if [ -d docs ]; then cp -arf docs %{buildroot}/%{_pkgdocdir}; fi +if [ -d example ]; then cp -arf example %{buildroot}/%{_pkgdocdir}; fi +if [ -d examples ]; then cp -arf examples %{buildroot}/%{_pkgdocdir}; fi +pushd %{buildroot} +if [ -d usr/lib ]; then + find usr/lib -type f -printf "\"/%h/%f\"\n" >> filelist.lst +fi +if [ -d usr/lib64 ]; then + find usr/lib64 -type f -printf "\"/%h/%f\"\n" >> filelist.lst +fi +if [ -d usr/bin ]; then + find usr/bin -type f -printf "\"/%h/%f\"\n" >> filelist.lst +fi +if [ -d usr/sbin ]; then + find usr/sbin -type f -printf "\"/%h/%f\"\n" >> filelist.lst +fi +touch doclist.lst +if [ -d usr/share/man ]; then + find usr/share/man -type f -printf "\"/%h/%f.gz\"\n" >> doclist.lst +fi +popd +mv %{buildroot}/filelist.lst . +mv %{buildroot}/doclist.lst . + +%files -n python3-dataprime -f filelist.lst +%dir %{python3_sitelib}/* + +%files help -f doclist.lst +%{_docdir}/* + +%changelog +* Tue Jun 20 2023 Python_Bot <Python_Bot@openeuler.org> - 0.1.9-1 +- Package Spec generated @@ -0,0 +1 @@ +14d49b2898628511f72ca316794b3410 dataprime-0.1.9.tar.gz |