%global _empty_manifest_terminate_build 0 Name: python-coinstac-computation Version: 1.0.2 Release: 1 Summary: Generic computation implementation on COINSTAC. License: MIT URL: https://github.com/trendscenter/coinstac-computation Source0: https://mirrors.aliyun.com/pypi/web/packages/ea/99/7e70962defea4d05e5d9ab73b9f941890481f0846d3fbe7bc777e58936ec/coinstac-computation-1.0.2.tar.gz BuildArch: noarch %description #### **ComputationPhase** signature: ```python from coinstac_computation import ComputationPhase class PhaseLoadData(ComputationPhase): def _initialize(self): """Put anything that needs to be initialized only once here""" pass def compute(self): out = {} """To end multi-iterative phase, and go to the next phase, in local or remote set:""" out['jump_to_next'] = True """To stop the computation, In remote set:""" out['success'] = True return out ``` ### Example: Gather max even numbers from each site #### A full working use case is in the [examples/basic](./examples/basic) directory where: * Local sites filters out even numbers and sends to the remote. * Remote finds the max across sites and returns the final result to each of the sites. * Sites save final result. #### inputspec.json data: ```json [ { "data": { "value": [10, 3, 5, 6, 7, 8, 12, 38, 32, 789, 776, 441] } }, { "data": { "value": [12, 33, 88, 61, 37, 58, 103, 3386, 312, 9, 77, 41] } } ] ``` ### Local node pipeline: ```python import os from coinstac_computation import COINSTACPyNode, ComputationPhase class PhaseLoadData(ComputationPhase): def compute(self): data = [] for d in self.input['data']: if d % 2 == 0: data.append(d) return {'filtered_data': data} class PhaseSaveResult(ComputationPhase): def compute(self): with open(f"{self.out_dir + os.sep + 'results.txt'}", 'w') as out: out.write(f"{self.input['aggregated_data']}") local = COINSTACPyNode(mode='local', debug=True) local.add_phase(PhaseLoadData) local.add_phase(PhaseSaveResult) ``` ### Remote node pipeline: ```python from coinstac_computation import COINSTACPyNode, ComputationPhase, PhaseEndWithSuccess class PhaseCollectMaxEvenData(ComputationPhase): def compute(self): data = [] for site, site_vars in self.input.items(): site_max = max(site_vars['filtered_data']) data.append(site_max) return {'aggregated_data': data} remote = COINSTACPyNode(mode='remote', debug=True) remote.add_phase(PhaseCollectMaxEvenData) remote.add_phase(PhaseEndWithSuccess) ``` ### Entry point: ```python import coinstac from local_pipeline import local from remote_pipeline import remote coinstac.start(local, remote) ``` ### Run: ``` cd examples/basic/ ~/coinstac-computation/examples/basic/$ docker build -t base . && coinstac-simulator ```
### Advanced use cases: #### 1. Multiple local <---> remote iterations [example](./examples/multi_iterations): * Each sites cast a vote(positive vote if number is even) for multiple(default=51) times. * Remote gathers the votes and returns the final voting result to all sites at the end. * Sites save the final result. #### Overview: 1. Specify when to end the iterative phase with a phase jump signal as `jump_to_next=True`: ```python class PhaseSubmitVote(ComputationPhase): def _initialize(self): """This method runs only once""" self.cache['data_index'] = 0 self.cache['data'] = [] for line in open(self.base_dir + os.sep + self.input_args['data_source']).readlines(): self.cache['data'].append(float(line.strip())) def compute(self): out = { 'vote': self.cache['data'][self.cache['data_index']] % 2 == 0, } self.cache['data_index'] += 1 """Send a jump to next phase signal""" out['jump_to_next'] = self.cache['data_index'] > len(self.cache['data']) - 1 return out ``` 2. Add the phase as multi-iterations: ```python local.add_phase(PhaseSubmitVote, multi_iterations=True) ``` #### 2. Send data across local <---> remote [example](./examples/data_transfer): To make it simple, we send a matrix of size 1000 by 1000 to remote, aggregate it by averaging, and return to each site. Local: ```python import os from coinstac_computation import COINSTACPyNode, ComputationPhase import numpy as np class PhaseLoadData(ComputationPhase): def compute(self): out = {} data = np.random.randn(*self.input['matrix_shape']) out.update(**self.send("site_matrix", data)) return out class PhaseSaveResult(ComputationPhase): def compute(self): data = self.recv('averaged_matrix') np.save(self.out_dir + os.sep + "averaged_matrix.npy", data) local = COINSTACPyNode(mode='local', debug=True) local.add_phase(PhaseLoadData) local.add_phase(PhaseSaveResult) ``` Remote: ```python from coinstac_computation import COINSTACPyNode, ComputationPhase, PhaseEndWithSuccess import numpy as np class PhaseAggregateMatrix(ComputationPhase): def compute(self): out = {} data = self.recv("site_matrix") mean_data = np.array(data).mean(0) out.update(**self.send("averaged_matrix", mean_data)) return out remote = COINSTACPyNode(mode='remote', debug=True) remote.add_phase(PhaseAggregateMatrix) remote.add_phase(PhaseEndWithSuccess) ```
### Sample logs from local0 ``` [INPUT] 14:27:36 02/04/2022 ->{'data_source': 'data_file.txt'} [CACHE] 14:27:36 02/04/2022 ->{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 0, 'PhaseSaveResult': 0}}} <-{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 1, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 1, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True} [OUTPUT] 14:27:36 02/04/2022 <-{'output': {'vote': True, 'jump_to_next': False}} [INPUT] 14:27:37 02/04/2022 ->{} [CACHE] 14:27:37 02/04/2022 ->{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 1, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 1, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True} <-{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 2, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 2, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True} [OUTPUT] 14:27:37 02/04/2022 <-{'output': {'vote': False, 'jump_to_next': False}} [INPUT] 14:27:37 02/04/2022 ->{} [CACHE] 14:27:37 02/04/2022 ->{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 2, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 2, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True} <-{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 3, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 3, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True} [OUTPUT] 14:27:37 02/04/2022 <-{'output': {'vote': True, 'jump_to_next': False}} ``` ### Sample logs from remote ``` [INPUT] 14:27:37 02/04/2022 ->{'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}} [CACHE] 14:27:37 02/04/2022 ->{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 0, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}} <-{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 1, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3]], 'PHASE:PhaseCollectVote': True} [OUTPUT] 14:27:37 02/04/2022 <-{'output': {}, 'success': False} [INPUT] 14:27:37 02/04/2022 ->{'local0': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': True, 'jump_to_next': False}, 'local3': {'vote': True, 'jump_to_next': False}} [CACHE] 14:27:37 02/04/2022 ->{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 1, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3]], 'PHASE:PhaseCollectVote': True} <-{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 2, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3], [3, 1]], 'PHASE:PhaseCollectVote': True} [OUTPUT] 14:27:37 02/04/2022 <-{'output': {}, 'success': False} [INPUT] 14:27:37 02/04/2022 ->{'local0': {'vote': True, 'jump_to_next': False}, 'local3': {'vote': True, 'jump_to_next': False}, 'local2': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': True, 'jump_to_next': False}} [CACHE] 14:27:37 02/04/2022 ->{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 2, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3], [3, 1]], 'PHASE:PhaseCollectVote': True} <-{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 3, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3], [3, 1], [4, 0]], 'PHASE:PhaseCollectVote': True} [OUTPUT] 14:27:37 02/04/2022 <-{'output': {}, 'success': False} ```
### Development notes: * Make sure you have: * **docker** installed and running. * **nodejs** installed. * **coinstac-simulator** package installed. `npm install --global coinstac-simulator` * Must set `debug=False` while deploying. * Backward compatible to the older library(compspecVersion=1): * Add the following snippet at the end of local and remote pipeline scripts. ```python if __name__ == "__main__": local.to_stdout() ``` * Use [version 1.0](./examples/basic/compspecv1.json) compspec format. * Comment out line `CMD ["python", "entry.py"]` in the `Dockerfile`. * You can also use a **remote debugger** in pycharm as [here](https://www.jetbrains.com/help/pycharm/remote-debugging-with-product.html#remote-debug-config). ### Thanks! %package -n python3-coinstac-computation Summary: Generic computation implementation on COINSTAC. Provides: python-coinstac-computation BuildRequires: python3-devel BuildRequires: python3-setuptools BuildRequires: python3-pip %description -n python3-coinstac-computation #### **ComputationPhase** signature: ```python from coinstac_computation import ComputationPhase class PhaseLoadData(ComputationPhase): def _initialize(self): """Put anything that needs to be initialized only once here""" pass def compute(self): out = {} """To end multi-iterative phase, and go to the next phase, in local or remote set:""" out['jump_to_next'] = True """To stop the computation, In remote set:""" out['success'] = True return out ``` ### Example: Gather max even numbers from each site #### A full working use case is in the [examples/basic](./examples/basic) directory where: * Local sites filters out even numbers and sends to the remote. * Remote finds the max across sites and returns the final result to each of the sites. * Sites save final result. #### inputspec.json data: ```json [ { "data": { "value": [10, 3, 5, 6, 7, 8, 12, 38, 32, 789, 776, 441] } }, { "data": { "value": [12, 33, 88, 61, 37, 58, 103, 3386, 312, 9, 77, 41] } } ] ``` ### Local node pipeline: ```python import os from coinstac_computation import COINSTACPyNode, ComputationPhase class PhaseLoadData(ComputationPhase): def compute(self): data = [] for d in self.input['data']: if d % 2 == 0: data.append(d) return {'filtered_data': data} class PhaseSaveResult(ComputationPhase): def compute(self): with open(f"{self.out_dir + os.sep + 'results.txt'}", 'w') as out: out.write(f"{self.input['aggregated_data']}") local = COINSTACPyNode(mode='local', debug=True) local.add_phase(PhaseLoadData) local.add_phase(PhaseSaveResult) ``` ### Remote node pipeline: ```python from coinstac_computation import COINSTACPyNode, ComputationPhase, PhaseEndWithSuccess class PhaseCollectMaxEvenData(ComputationPhase): def compute(self): data = [] for site, site_vars in self.input.items(): site_max = max(site_vars['filtered_data']) data.append(site_max) return {'aggregated_data': data} remote = COINSTACPyNode(mode='remote', debug=True) remote.add_phase(PhaseCollectMaxEvenData) remote.add_phase(PhaseEndWithSuccess) ``` ### Entry point: ```python import coinstac from local_pipeline import local from remote_pipeline import remote coinstac.start(local, remote) ``` ### Run: ``` cd examples/basic/ ~/coinstac-computation/examples/basic/$ docker build -t base . && coinstac-simulator ```
### Advanced use cases: #### 1. Multiple local <---> remote iterations [example](./examples/multi_iterations): * Each sites cast a vote(positive vote if number is even) for multiple(default=51) times. * Remote gathers the votes and returns the final voting result to all sites at the end. * Sites save the final result. #### Overview: 1. Specify when to end the iterative phase with a phase jump signal as `jump_to_next=True`: ```python class PhaseSubmitVote(ComputationPhase): def _initialize(self): """This method runs only once""" self.cache['data_index'] = 0 self.cache['data'] = [] for line in open(self.base_dir + os.sep + self.input_args['data_source']).readlines(): self.cache['data'].append(float(line.strip())) def compute(self): out = { 'vote': self.cache['data'][self.cache['data_index']] % 2 == 0, } self.cache['data_index'] += 1 """Send a jump to next phase signal""" out['jump_to_next'] = self.cache['data_index'] > len(self.cache['data']) - 1 return out ``` 2. Add the phase as multi-iterations: ```python local.add_phase(PhaseSubmitVote, multi_iterations=True) ``` #### 2. Send data across local <---> remote [example](./examples/data_transfer): To make it simple, we send a matrix of size 1000 by 1000 to remote, aggregate it by averaging, and return to each site. Local: ```python import os from coinstac_computation import COINSTACPyNode, ComputationPhase import numpy as np class PhaseLoadData(ComputationPhase): def compute(self): out = {} data = np.random.randn(*self.input['matrix_shape']) out.update(**self.send("site_matrix", data)) return out class PhaseSaveResult(ComputationPhase): def compute(self): data = self.recv('averaged_matrix') np.save(self.out_dir + os.sep + "averaged_matrix.npy", data) local = COINSTACPyNode(mode='local', debug=True) local.add_phase(PhaseLoadData) local.add_phase(PhaseSaveResult) ``` Remote: ```python from coinstac_computation import COINSTACPyNode, ComputationPhase, PhaseEndWithSuccess import numpy as np class PhaseAggregateMatrix(ComputationPhase): def compute(self): out = {} data = self.recv("site_matrix") mean_data = np.array(data).mean(0) out.update(**self.send("averaged_matrix", mean_data)) return out remote = COINSTACPyNode(mode='remote', debug=True) remote.add_phase(PhaseAggregateMatrix) remote.add_phase(PhaseEndWithSuccess) ```
### Sample logs from local0 ``` [INPUT] 14:27:36 02/04/2022 ->{'data_source': 'data_file.txt'} [CACHE] 14:27:36 02/04/2022 ->{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 0, 'PhaseSaveResult': 0}}} <-{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 1, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 1, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True} [OUTPUT] 14:27:36 02/04/2022 <-{'output': {'vote': True, 'jump_to_next': False}} [INPUT] 14:27:37 02/04/2022 ->{} [CACHE] 14:27:37 02/04/2022 ->{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 1, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 1, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True} <-{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 2, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 2, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True} [OUTPUT] 14:27:37 02/04/2022 <-{'output': {'vote': False, 'jump_to_next': False}} [INPUT] 14:27:37 02/04/2022 ->{} [CACHE] 14:27:37 02/04/2022 ->{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 2, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 2, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True} <-{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 3, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 3, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True} [OUTPUT] 14:27:37 02/04/2022 <-{'output': {'vote': True, 'jump_to_next': False}} ``` ### Sample logs from remote ``` [INPUT] 14:27:37 02/04/2022 ->{'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}} [CACHE] 14:27:37 02/04/2022 ->{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 0, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}} <-{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 1, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3]], 'PHASE:PhaseCollectVote': True} [OUTPUT] 14:27:37 02/04/2022 <-{'output': {}, 'success': False} [INPUT] 14:27:37 02/04/2022 ->{'local0': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': True, 'jump_to_next': False}, 'local3': {'vote': True, 'jump_to_next': False}} [CACHE] 14:27:37 02/04/2022 ->{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 1, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3]], 'PHASE:PhaseCollectVote': True} <-{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 2, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3], [3, 1]], 'PHASE:PhaseCollectVote': True} [OUTPUT] 14:27:37 02/04/2022 <-{'output': {}, 'success': False} [INPUT] 14:27:37 02/04/2022 ->{'local0': {'vote': True, 'jump_to_next': False}, 'local3': {'vote': True, 'jump_to_next': False}, 'local2': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': True, 'jump_to_next': False}} [CACHE] 14:27:37 02/04/2022 ->{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 2, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3], [3, 1]], 'PHASE:PhaseCollectVote': True} <-{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 3, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3], [3, 1], [4, 0]], 'PHASE:PhaseCollectVote': True} [OUTPUT] 14:27:37 02/04/2022 <-{'output': {}, 'success': False} ```
### Development notes: * Make sure you have: * **docker** installed and running. * **nodejs** installed. * **coinstac-simulator** package installed. `npm install --global coinstac-simulator` * Must set `debug=False` while deploying. * Backward compatible to the older library(compspecVersion=1): * Add the following snippet at the end of local and remote pipeline scripts. ```python if __name__ == "__main__": local.to_stdout() ``` * Use [version 1.0](./examples/basic/compspecv1.json) compspec format. * Comment out line `CMD ["python", "entry.py"]` in the `Dockerfile`. * You can also use a **remote debugger** in pycharm as [here](https://www.jetbrains.com/help/pycharm/remote-debugging-with-product.html#remote-debug-config). ### Thanks! %package help Summary: Development documents and examples for coinstac-computation Provides: python3-coinstac-computation-doc %description help #### **ComputationPhase** signature: ```python from coinstac_computation import ComputationPhase class PhaseLoadData(ComputationPhase): def _initialize(self): """Put anything that needs to be initialized only once here""" pass def compute(self): out = {} """To end multi-iterative phase, and go to the next phase, in local or remote set:""" out['jump_to_next'] = True """To stop the computation, In remote set:""" out['success'] = True return out ``` ### Example: Gather max even numbers from each site #### A full working use case is in the [examples/basic](./examples/basic) directory where: * Local sites filters out even numbers and sends to the remote. * Remote finds the max across sites and returns the final result to each of the sites. * Sites save final result. #### inputspec.json data: ```json [ { "data": { "value": [10, 3, 5, 6, 7, 8, 12, 38, 32, 789, 776, 441] } }, { "data": { "value": [12, 33, 88, 61, 37, 58, 103, 3386, 312, 9, 77, 41] } } ] ``` ### Local node pipeline: ```python import os from coinstac_computation import COINSTACPyNode, ComputationPhase class PhaseLoadData(ComputationPhase): def compute(self): data = [] for d in self.input['data']: if d % 2 == 0: data.append(d) return {'filtered_data': data} class PhaseSaveResult(ComputationPhase): def compute(self): with open(f"{self.out_dir + os.sep + 'results.txt'}", 'w') as out: out.write(f"{self.input['aggregated_data']}") local = COINSTACPyNode(mode='local', debug=True) local.add_phase(PhaseLoadData) local.add_phase(PhaseSaveResult) ``` ### Remote node pipeline: ```python from coinstac_computation import COINSTACPyNode, ComputationPhase, PhaseEndWithSuccess class PhaseCollectMaxEvenData(ComputationPhase): def compute(self): data = [] for site, site_vars in self.input.items(): site_max = max(site_vars['filtered_data']) data.append(site_max) return {'aggregated_data': data} remote = COINSTACPyNode(mode='remote', debug=True) remote.add_phase(PhaseCollectMaxEvenData) remote.add_phase(PhaseEndWithSuccess) ``` ### Entry point: ```python import coinstac from local_pipeline import local from remote_pipeline import remote coinstac.start(local, remote) ``` ### Run: ``` cd examples/basic/ ~/coinstac-computation/examples/basic/$ docker build -t base . && coinstac-simulator ```
### Advanced use cases: #### 1. Multiple local <---> remote iterations [example](./examples/multi_iterations): * Each sites cast a vote(positive vote if number is even) for multiple(default=51) times. * Remote gathers the votes and returns the final voting result to all sites at the end. * Sites save the final result. #### Overview: 1. Specify when to end the iterative phase with a phase jump signal as `jump_to_next=True`: ```python class PhaseSubmitVote(ComputationPhase): def _initialize(self): """This method runs only once""" self.cache['data_index'] = 0 self.cache['data'] = [] for line in open(self.base_dir + os.sep + self.input_args['data_source']).readlines(): self.cache['data'].append(float(line.strip())) def compute(self): out = { 'vote': self.cache['data'][self.cache['data_index']] % 2 == 0, } self.cache['data_index'] += 1 """Send a jump to next phase signal""" out['jump_to_next'] = self.cache['data_index'] > len(self.cache['data']) - 1 return out ``` 2. Add the phase as multi-iterations: ```python local.add_phase(PhaseSubmitVote, multi_iterations=True) ``` #### 2. Send data across local <---> remote [example](./examples/data_transfer): To make it simple, we send a matrix of size 1000 by 1000 to remote, aggregate it by averaging, and return to each site. Local: ```python import os from coinstac_computation import COINSTACPyNode, ComputationPhase import numpy as np class PhaseLoadData(ComputationPhase): def compute(self): out = {} data = np.random.randn(*self.input['matrix_shape']) out.update(**self.send("site_matrix", data)) return out class PhaseSaveResult(ComputationPhase): def compute(self): data = self.recv('averaged_matrix') np.save(self.out_dir + os.sep + "averaged_matrix.npy", data) local = COINSTACPyNode(mode='local', debug=True) local.add_phase(PhaseLoadData) local.add_phase(PhaseSaveResult) ``` Remote: ```python from coinstac_computation import COINSTACPyNode, ComputationPhase, PhaseEndWithSuccess import numpy as np class PhaseAggregateMatrix(ComputationPhase): def compute(self): out = {} data = self.recv("site_matrix") mean_data = np.array(data).mean(0) out.update(**self.send("averaged_matrix", mean_data)) return out remote = COINSTACPyNode(mode='remote', debug=True) remote.add_phase(PhaseAggregateMatrix) remote.add_phase(PhaseEndWithSuccess) ```
### Sample logs from local0 ``` [INPUT] 14:27:36 02/04/2022 ->{'data_source': 'data_file.txt'} [CACHE] 14:27:36 02/04/2022 ->{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 0, 'PhaseSaveResult': 0}}} <-{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 1, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 1, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True} [OUTPUT] 14:27:36 02/04/2022 <-{'output': {'vote': True, 'jump_to_next': False}} [INPUT] 14:27:37 02/04/2022 ->{} [CACHE] 14:27:37 02/04/2022 ->{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 1, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 1, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True} <-{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 2, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 2, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True} [OUTPUT] 14:27:37 02/04/2022 <-{'output': {'vote': False, 'jump_to_next': False}} [INPUT] 14:27:37 02/04/2022 ->{} [CACHE] 14:27:37 02/04/2022 ->{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 2, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 2, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True} <-{'PIPELINE:LOCAL': {'index': 0, 'iterations': {'PhaseSubmitVote': 3, 'PhaseSaveResult': 0}}, 'input_args': {'data_source': 'data_file.txt'}, 'next_phase': 'PhaseSubmitVote', 'data_index': 3, 'data': [712.0, 309.0, 574.0, 838.0, 296.0, 349.0, 781.0, 749.0, 360.0, 702.0, 253.0, 831.0, 911.0, 14.0, 259.0, 805.0, 494.0, 501.0, 549.0, 624.0, 919.0, 836.0, 362.0, 373.0, 563.0, 134.0, 610.0, 875.0, 328.0, 299.0, 874.0, 387.0, 743.0, 233.0, 834.0, 870.0, 685.0, 342.0, 79.0, 270.0, 314.0, 42.0, 364.0, 902.0, 755.0, 248.0, 815.0, 4.0, 21.0, 423.0, 302.0], 'PHASE:PhaseSubmitVote': True} [OUTPUT] 14:27:37 02/04/2022 <-{'output': {'vote': True, 'jump_to_next': False}} ``` ### Sample logs from remote ``` [INPUT] 14:27:37 02/04/2022 ->{'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}} [CACHE] 14:27:37 02/04/2022 ->{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 0, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}} <-{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 1, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3]], 'PHASE:PhaseCollectVote': True} [OUTPUT] 14:27:37 02/04/2022 <-{'output': {}, 'success': False} [INPUT] 14:27:37 02/04/2022 ->{'local0': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': True, 'jump_to_next': False}, 'local3': {'vote': True, 'jump_to_next': False}} [CACHE] 14:27:37 02/04/2022 ->{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 1, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3]], 'PHASE:PhaseCollectVote': True} <-{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 2, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3], [3, 1]], 'PHASE:PhaseCollectVote': True} [OUTPUT] 14:27:37 02/04/2022 <-{'output': {}, 'success': False} [INPUT] 14:27:37 02/04/2022 ->{'local0': {'vote': True, 'jump_to_next': False}, 'local3': {'vote': True, 'jump_to_next': False}, 'local2': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': True, 'jump_to_next': False}} [CACHE] 14:27:37 02/04/2022 ->{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 2, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3], [3, 1]], 'PHASE:PhaseCollectVote': True} <-{'PIPELINE:REMOTE': {'index': 0, 'iterations': {'PhaseCollectVote': 3, 'PhaseSendGlobalResults': 0, 'PhaseEndWithSuccess': 0}}, 'input_args': {'local0': {'vote': True, 'jump_to_next': False}, 'local1': {'vote': False, 'jump_to_next': False}, 'local2': {'vote': False, 'jump_to_next': False}, 'local3': {'vote': False, 'jump_to_next': False}}, 'next_phase': 'PhaseCollectVote', 'vote_ballot': [[1, 3], [3, 1], [4, 0]], 'PHASE:PhaseCollectVote': True} [OUTPUT] 14:27:37 02/04/2022 <-{'output': {}, 'success': False} ```
### Development notes: * Make sure you have: * **docker** installed and running. * **nodejs** installed. * **coinstac-simulator** package installed. `npm install --global coinstac-simulator` * Must set `debug=False` while deploying. * Backward compatible to the older library(compspecVersion=1): * Add the following snippet at the end of local and remote pipeline scripts. ```python if __name__ == "__main__": local.to_stdout() ``` * Use [version 1.0](./examples/basic/compspecv1.json) compspec format. * Comment out line `CMD ["python", "entry.py"]` in the `Dockerfile`. * You can also use a **remote debugger** in pycharm as [here](https://www.jetbrains.com/help/pycharm/remote-debugging-with-product.html#remote-debug-config). ### Thanks! %prep %autosetup -n coinstac-computation-1.0.2 %build %py3_build %install %py3_install install -d -m755 %{buildroot}/%{_pkgdocdir} if [ -d doc ]; then cp -arf doc %{buildroot}/%{_pkgdocdir}; fi if [ -d docs ]; then cp -arf docs %{buildroot}/%{_pkgdocdir}; fi if [ -d example ]; then cp -arf example %{buildroot}/%{_pkgdocdir}; fi if [ -d examples ]; then cp -arf examples %{buildroot}/%{_pkgdocdir}; fi pushd %{buildroot} if [ -d usr/lib ]; then find usr/lib -type f -printf "\"/%h/%f\"\n" >> filelist.lst fi if [ -d usr/lib64 ]; then find usr/lib64 -type f -printf "\"/%h/%f\"\n" >> filelist.lst fi if [ -d usr/bin ]; then find usr/bin -type f -printf "\"/%h/%f\"\n" >> filelist.lst fi if [ -d usr/sbin ]; then find usr/sbin -type f -printf "\"/%h/%f\"\n" >> filelist.lst fi touch doclist.lst if [ -d usr/share/man ]; then find usr/share/man -type f -printf "\"/%h/%f.gz\"\n" >> doclist.lst fi popd mv %{buildroot}/filelist.lst . mv %{buildroot}/doclist.lst . %files -n python3-coinstac-computation -f filelist.lst %dir %{python3_sitelib}/* %files help -f doclist.lst %{_docdir}/* %changelog * Fri Jun 09 2023 Python_Bot - 1.0.2-1 - Package Spec generated