%global _empty_manifest_terminate_build 0 Name: python-paradag Version: 1.2.0 Release: 1 Summary: A robust DAG implementation for parallel programming License: MIT URL: https://github.com/xianghuzhao/paradag Source0: https://mirrors.nju.edu.cn/pypi/web/packages/59/f0/9aa8f1b14c60a25554a12c40c0eda1d3402d86e7a0cb0e7e80063ecf07cd/paradag-1.2.0.tar.gz BuildArch: noarch %description # paradag [![PyPI](https://badge.fury.io/py/paradag.svg)](https://pypi.org/project/paradag/) [![Travis CI Status](https://travis-ci.org/xianghuzhao/paradag.svg?branch=master)](https://travis-ci.org/xianghuzhao/paradag) [![Code Climate](https://codeclimate.com/github/xianghuzhao/paradag/badges/gpa.svg)](https://codeclimate.com/github/xianghuzhao/paradag) `paradag` a robust DAG package for easy parallel execution. `paradag` is implemented in pure python and totally independent of any third party packages. [Directed acyclic graph (DAG)](https://en.wikipedia.org/wiki/Directed_acyclic_graph) is commonly used as a dependency graph. It could be used to describe the dependencies of tasks. The order of task executions must comply with the dependencies, where tasks with direct or indirect path must run in sequence, and tasks without any connection could run in parallel. ## Installation ```shell $ pip install paradag ``` ## Create a DAG Before running tasks, first create a DAG, with each vertex representing a task. The vertex of DAG instance could be any [hashable object](https://docs.python.org/3/glossary.html#term-hashable), like integer, string, tuple of hashable objects, instance of user-defined class, etc. ```python from paradag import DAG class Vtx(object): def __init__(self, v): self.__value = v vtx = Vtx(999) dag = DAG() dag.add_vertex(123, 'abcde', 'xyz', ('a', 'b', 3), vtx) dag.add_edge(123, 'abcde') # 123 -> 'abcde' dag.add_edge('abcde', ('a', 'b', 3), vtx) # 'abcde' -> ('a', 'b', 3), 'abcde' -> vtx ``` `add_edge` accepts one starting vertex and one or more ending vertices. Please pay attention not to make a cycle with `add_edge`, which will raise a `DAGCycleError`. The common DAG properties are accessible: ```python print(dag.vertex_size()) print(dag.edge_size()) print(dag.successors('abcde')) print(dag.predecessors(vtx)) print(dag.all_starts()) print(dag.all_terminals()) ``` ## Run tasks in sequence Write your executor and optionally a selector. The executor handles the real execution for each vertex. ```python from paradag import dag_run from paradag import SequentialProcessor class CustomExecutor: def param(self, vertex): return vertex def execute(self, param): print('Executing:', param) print(dag_run(dag, processor=SequentialProcessor(), executor=CustomExecutor())) ``` `dag_run` is the core function for task scheduling. ## Run tasks in parallel Run tasks in parallel is quite similar, while only change the processor to `MultiThreadProcessor`. ```python from paradag import MultiThreadProcessor dag_run(dag, processor=MultiThreadProcessor(), executor=CustomExecutor()) ``` The default selector `FullSelector` will try to find as many tasks as possible which could run in parallel. This could be adjusted with custom selector. The following selector will only allow at most 4 tasks running in parallel. ```python class CustomSelector(object): def select(self, running, idle): task_number = max(0, 4-len(running)) return list(idle)[:task_number] dag_run(dag, processor=MultiThreadProcessor(), selector=CustomSelector(), executor=CustomExecutor()) ``` Once you are using `MultiThreadProcessor`, great attentions must be paid that `execute` of executor could run in parallel. Try not to modify any variables outside the `execute` function, and all parameters should be passed by the `param` argument. Also make sure that the return values generated from `param` function are independent. ## Get task running status The executor could also implement the optional methods which could get the task running status. ```python class CustomExecutor: def param(self, vertex): return vertex def execute(self, param): print('Executing:', param) def report_start(self, vertices): print('Start to run:', vertices) def report_running(self, vertices): print('Current running:', vertices) def report_finish(self, vertices_result): for vertex, result in vertices_result: print('Finished running {0} with result: {1}'.format(vertex, result)) dag_run(dag, processor=MultiThreadProcessor(), executor=CustomExecutor()) ``` ## Deliver result to descendants In case the result for one task should be used for its descendants, `deliver` method could be implemented in executor. ```python class CustomExecutor: def __init__(self): self.__level = {} def param(self, vertex): return self.__level.get(vertex, 0) def execute(self, param): return param + 1 def report_finish(self, vertices_result): for vertex, result in vertices_result: print('Vertex {0} finished, level: {1}'.format(vertex, result)) def deliver(self, vertex, result): self.__level[vertex] = result ``` The result from parent will be delivered to the vertex before execution. ## Topological sorting [Topological sorting](https://en.wikipedia.org/wiki/Topological_sorting) could also be done by `paradag.dag_run` function. The return value of `dag_run` could be considered as the result of topological sorting. A simple topological sorting without any execution: ```python from paradag import SingleSelector, RandomSelector, ShuffleSelector dag = DAG() dag.add_vertex(1, 2, 3, 4, 5) dag.add_edge(1, 4) dag.add_edge(4, 2, 5) print(dag_run(dag)) print(dag_run(dag, selector=SingleSelector())) print(dag_run(dag, selector=RandomSelector())) print(dag_run(dag, selector=ShuffleSelector())) ``` The solution for topological sorting is not necessarily unique, and the final orders may vary with different selectors. %package -n python3-paradag Summary: A robust DAG implementation for parallel programming Provides: python-paradag BuildRequires: python3-devel BuildRequires: python3-setuptools BuildRequires: python3-pip %description -n python3-paradag # paradag [![PyPI](https://badge.fury.io/py/paradag.svg)](https://pypi.org/project/paradag/) [![Travis CI Status](https://travis-ci.org/xianghuzhao/paradag.svg?branch=master)](https://travis-ci.org/xianghuzhao/paradag) [![Code Climate](https://codeclimate.com/github/xianghuzhao/paradag/badges/gpa.svg)](https://codeclimate.com/github/xianghuzhao/paradag) `paradag` a robust DAG package for easy parallel execution. `paradag` is implemented in pure python and totally independent of any third party packages. [Directed acyclic graph (DAG)](https://en.wikipedia.org/wiki/Directed_acyclic_graph) is commonly used as a dependency graph. It could be used to describe the dependencies of tasks. The order of task executions must comply with the dependencies, where tasks with direct or indirect path must run in sequence, and tasks without any connection could run in parallel. ## Installation ```shell $ pip install paradag ``` ## Create a DAG Before running tasks, first create a DAG, with each vertex representing a task. The vertex of DAG instance could be any [hashable object](https://docs.python.org/3/glossary.html#term-hashable), like integer, string, tuple of hashable objects, instance of user-defined class, etc. ```python from paradag import DAG class Vtx(object): def __init__(self, v): self.__value = v vtx = Vtx(999) dag = DAG() dag.add_vertex(123, 'abcde', 'xyz', ('a', 'b', 3), vtx) dag.add_edge(123, 'abcde') # 123 -> 'abcde' dag.add_edge('abcde', ('a', 'b', 3), vtx) # 'abcde' -> ('a', 'b', 3), 'abcde' -> vtx ``` `add_edge` accepts one starting vertex and one or more ending vertices. Please pay attention not to make a cycle with `add_edge`, which will raise a `DAGCycleError`. The common DAG properties are accessible: ```python print(dag.vertex_size()) print(dag.edge_size()) print(dag.successors('abcde')) print(dag.predecessors(vtx)) print(dag.all_starts()) print(dag.all_terminals()) ``` ## Run tasks in sequence Write your executor and optionally a selector. The executor handles the real execution for each vertex. ```python from paradag import dag_run from paradag import SequentialProcessor class CustomExecutor: def param(self, vertex): return vertex def execute(self, param): print('Executing:', param) print(dag_run(dag, processor=SequentialProcessor(), executor=CustomExecutor())) ``` `dag_run` is the core function for task scheduling. ## Run tasks in parallel Run tasks in parallel is quite similar, while only change the processor to `MultiThreadProcessor`. ```python from paradag import MultiThreadProcessor dag_run(dag, processor=MultiThreadProcessor(), executor=CustomExecutor()) ``` The default selector `FullSelector` will try to find as many tasks as possible which could run in parallel. This could be adjusted with custom selector. The following selector will only allow at most 4 tasks running in parallel. ```python class CustomSelector(object): def select(self, running, idle): task_number = max(0, 4-len(running)) return list(idle)[:task_number] dag_run(dag, processor=MultiThreadProcessor(), selector=CustomSelector(), executor=CustomExecutor()) ``` Once you are using `MultiThreadProcessor`, great attentions must be paid that `execute` of executor could run in parallel. Try not to modify any variables outside the `execute` function, and all parameters should be passed by the `param` argument. Also make sure that the return values generated from `param` function are independent. ## Get task running status The executor could also implement the optional methods which could get the task running status. ```python class CustomExecutor: def param(self, vertex): return vertex def execute(self, param): print('Executing:', param) def report_start(self, vertices): print('Start to run:', vertices) def report_running(self, vertices): print('Current running:', vertices) def report_finish(self, vertices_result): for vertex, result in vertices_result: print('Finished running {0} with result: {1}'.format(vertex, result)) dag_run(dag, processor=MultiThreadProcessor(), executor=CustomExecutor()) ``` ## Deliver result to descendants In case the result for one task should be used for its descendants, `deliver` method could be implemented in executor. ```python class CustomExecutor: def __init__(self): self.__level = {} def param(self, vertex): return self.__level.get(vertex, 0) def execute(self, param): return param + 1 def report_finish(self, vertices_result): for vertex, result in vertices_result: print('Vertex {0} finished, level: {1}'.format(vertex, result)) def deliver(self, vertex, result): self.__level[vertex] = result ``` The result from parent will be delivered to the vertex before execution. ## Topological sorting [Topological sorting](https://en.wikipedia.org/wiki/Topological_sorting) could also be done by `paradag.dag_run` function. The return value of `dag_run` could be considered as the result of topological sorting. A simple topological sorting without any execution: ```python from paradag import SingleSelector, RandomSelector, ShuffleSelector dag = DAG() dag.add_vertex(1, 2, 3, 4, 5) dag.add_edge(1, 4) dag.add_edge(4, 2, 5) print(dag_run(dag)) print(dag_run(dag, selector=SingleSelector())) print(dag_run(dag, selector=RandomSelector())) print(dag_run(dag, selector=ShuffleSelector())) ``` The solution for topological sorting is not necessarily unique, and the final orders may vary with different selectors. %package help Summary: Development documents and examples for paradag Provides: python3-paradag-doc %description help # paradag [![PyPI](https://badge.fury.io/py/paradag.svg)](https://pypi.org/project/paradag/) [![Travis CI Status](https://travis-ci.org/xianghuzhao/paradag.svg?branch=master)](https://travis-ci.org/xianghuzhao/paradag) [![Code Climate](https://codeclimate.com/github/xianghuzhao/paradag/badges/gpa.svg)](https://codeclimate.com/github/xianghuzhao/paradag) `paradag` a robust DAG package for easy parallel execution. `paradag` is implemented in pure python and totally independent of any third party packages. [Directed acyclic graph (DAG)](https://en.wikipedia.org/wiki/Directed_acyclic_graph) is commonly used as a dependency graph. It could be used to describe the dependencies of tasks. The order of task executions must comply with the dependencies, where tasks with direct or indirect path must run in sequence, and tasks without any connection could run in parallel. ## Installation ```shell $ pip install paradag ``` ## Create a DAG Before running tasks, first create a DAG, with each vertex representing a task. The vertex of DAG instance could be any [hashable object](https://docs.python.org/3/glossary.html#term-hashable), like integer, string, tuple of hashable objects, instance of user-defined class, etc. ```python from paradag import DAG class Vtx(object): def __init__(self, v): self.__value = v vtx = Vtx(999) dag = DAG() dag.add_vertex(123, 'abcde', 'xyz', ('a', 'b', 3), vtx) dag.add_edge(123, 'abcde') # 123 -> 'abcde' dag.add_edge('abcde', ('a', 'b', 3), vtx) # 'abcde' -> ('a', 'b', 3), 'abcde' -> vtx ``` `add_edge` accepts one starting vertex and one or more ending vertices. Please pay attention not to make a cycle with `add_edge`, which will raise a `DAGCycleError`. The common DAG properties are accessible: ```python print(dag.vertex_size()) print(dag.edge_size()) print(dag.successors('abcde')) print(dag.predecessors(vtx)) print(dag.all_starts()) print(dag.all_terminals()) ``` ## Run tasks in sequence Write your executor and optionally a selector. The executor handles the real execution for each vertex. ```python from paradag import dag_run from paradag import SequentialProcessor class CustomExecutor: def param(self, vertex): return vertex def execute(self, param): print('Executing:', param) print(dag_run(dag, processor=SequentialProcessor(), executor=CustomExecutor())) ``` `dag_run` is the core function for task scheduling. ## Run tasks in parallel Run tasks in parallel is quite similar, while only change the processor to `MultiThreadProcessor`. ```python from paradag import MultiThreadProcessor dag_run(dag, processor=MultiThreadProcessor(), executor=CustomExecutor()) ``` The default selector `FullSelector` will try to find as many tasks as possible which could run in parallel. This could be adjusted with custom selector. The following selector will only allow at most 4 tasks running in parallel. ```python class CustomSelector(object): def select(self, running, idle): task_number = max(0, 4-len(running)) return list(idle)[:task_number] dag_run(dag, processor=MultiThreadProcessor(), selector=CustomSelector(), executor=CustomExecutor()) ``` Once you are using `MultiThreadProcessor`, great attentions must be paid that `execute` of executor could run in parallel. Try not to modify any variables outside the `execute` function, and all parameters should be passed by the `param` argument. Also make sure that the return values generated from `param` function are independent. ## Get task running status The executor could also implement the optional methods which could get the task running status. ```python class CustomExecutor: def param(self, vertex): return vertex def execute(self, param): print('Executing:', param) def report_start(self, vertices): print('Start to run:', vertices) def report_running(self, vertices): print('Current running:', vertices) def report_finish(self, vertices_result): for vertex, result in vertices_result: print('Finished running {0} with result: {1}'.format(vertex, result)) dag_run(dag, processor=MultiThreadProcessor(), executor=CustomExecutor()) ``` ## Deliver result to descendants In case the result for one task should be used for its descendants, `deliver` method could be implemented in executor. ```python class CustomExecutor: def __init__(self): self.__level = {} def param(self, vertex): return self.__level.get(vertex, 0) def execute(self, param): return param + 1 def report_finish(self, vertices_result): for vertex, result in vertices_result: print('Vertex {0} finished, level: {1}'.format(vertex, result)) def deliver(self, vertex, result): self.__level[vertex] = result ``` The result from parent will be delivered to the vertex before execution. ## Topological sorting [Topological sorting](https://en.wikipedia.org/wiki/Topological_sorting) could also be done by `paradag.dag_run` function. The return value of `dag_run` could be considered as the result of topological sorting. A simple topological sorting without any execution: ```python from paradag import SingleSelector, RandomSelector, ShuffleSelector dag = DAG() dag.add_vertex(1, 2, 3, 4, 5) dag.add_edge(1, 4) dag.add_edge(4, 2, 5) print(dag_run(dag)) print(dag_run(dag, selector=SingleSelector())) print(dag_run(dag, selector=RandomSelector())) print(dag_run(dag, selector=ShuffleSelector())) ``` The solution for topological sorting is not necessarily unique, and the final orders may vary with different selectors. %prep %autosetup -n paradag-1.2.0 %build %py3_build %install %py3_install install -d -m755 %{buildroot}/%{_pkgdocdir} if [ -d doc ]; then cp -arf doc %{buildroot}/%{_pkgdocdir}; fi if [ -d docs ]; then cp -arf docs %{buildroot}/%{_pkgdocdir}; fi if [ -d example ]; then cp -arf example %{buildroot}/%{_pkgdocdir}; fi if [ -d examples ]; then cp -arf examples %{buildroot}/%{_pkgdocdir}; fi pushd %{buildroot} if [ -d usr/lib ]; then find usr/lib -type f -printf "/%h/%f\n" >> filelist.lst fi if [ -d usr/lib64 ]; then find usr/lib64 -type f -printf "/%h/%f\n" >> filelist.lst fi if [ -d usr/bin ]; then find usr/bin -type f -printf "/%h/%f\n" >> filelist.lst fi if [ -d usr/sbin ]; then find usr/sbin -type f -printf "/%h/%f\n" >> filelist.lst fi touch doclist.lst if [ -d usr/share/man ]; then find usr/share/man -type f -printf "/%h/%f.gz\n" >> doclist.lst fi popd mv %{buildroot}/filelist.lst . mv %{buildroot}/doclist.lst . %files -n python3-paradag -f filelist.lst %dir %{python3_sitelib}/* %files help -f doclist.lst %{_docdir}/* %changelog * Wed May 10 2023 Python_Bot - 1.2.0-1 - Package Spec generated