123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- # coding: utf-8
- from synergine2.config import Config
- from synergine2.cycle import CycleManager
- from synergine2.log import SynergineLogger
- from synergine2.share import shared
- from synergine2.simulation import Simulation
- from synergine2.simulation import SimulationBehaviour
- from synergine2.simulation import SimulationMechanism
- from synergine2.simulation import Event
- from synergine2.simulation import Subject
- from synergine2.simulation import Subjects
- from synergine2.simulation import SubjectMechanism
- from synergine2.simulation import SubjectBehaviour
- from tests import BaseTest
-
-
- class MyEvent(Event):
- def __init__(self, value):
- self.value = value
-
-
- class MySubjectMechanism(SubjectMechanism):
- def run(self):
- return 42
-
-
- class MySubjectBehavior(SubjectBehaviour):
- use = [MySubjectMechanism]
-
- def run(self, data):
- if MySubjectMechanism in data and data[MySubjectMechanism] == 42:
- return self.subject.id
-
- def action(self, data) -> [Event]:
- return [MyEvent(data * 2)]
-
-
- class MySubject(Subject):
- behaviours_classes = [MySubjectBehavior]
-
-
- class MySubjects(Subjects):
- pass
-
-
- class MySimulationMechanism(SimulationMechanism):
- def run(self, process_number: int = None, process_count: int = None):
- return process_number + 1000
-
-
- class MySimulationBehaviour(SimulationBehaviour):
- use = [MySimulationMechanism]
-
- @classmethod
- def merge_data(cls, new_data, start_data=None):
- start_data = start_data or 0
- return start_data + new_data
-
- def run(self, data):
- return data[MySimulationMechanism] * 2
-
- def action(self, data) -> [Event]:
- return [MyEvent(data)]
-
-
- class MySimulation(Simulation):
- behaviours_classes = [MySimulationBehaviour]
-
-
- class TestCycle(BaseTest):
- def test_subjects_cycle(self):
- shared.reset()
- config = Config({'core': {'use_x_cores': 2}})
- logger = SynergineLogger(name='test')
-
- simulation = Simulation(config)
- subjects = MySubjects(simulation=simulation)
- simulation.subjects = subjects
-
- for i in range(3):
- subjects.append(MySubject(config, simulation=simulation))
-
- cycle_manager = CycleManager(
- config=config,
- logger=logger,
- simulation=simulation,
- )
-
- events = cycle_manager.next()
- cycle_manager.stop()
-
- assert 3 == len(events)
- event_values = [e.value for e in events]
- assert all([s.id * 2 in event_values for s in subjects])
-
- def test_new_subject(self):
- shared.reset()
- config = Config({'core': {'use_x_cores': 1}})
- logger = SynergineLogger(name='test')
-
- simulation = Simulation(config)
- subjects = MySubjects(simulation=simulation)
- simulation.subjects = subjects
-
- for i in range(3):
- subjects.append(MySubject(config, simulation=simulation))
-
- cycle_manager = CycleManager(
- config=config,
- logger=logger,
- simulation=simulation,
- )
-
- events = cycle_manager.next()
-
- assert 3 == len(events)
- event_values = [e.value for e in events]
- assert all([s.id * 2 in event_values for s in subjects])
-
- subjects.append(MySubject(config, simulation=simulation))
- events = cycle_manager.next()
- cycle_manager.stop()
-
- assert 4 == len(events)
- event_values = [e.value for e in events]
- assert all([s.id * 2 in event_values for s in subjects])
-
- def test_simulation_events(self):
- shared.reset()
- config = Config({'core': {'use_x_cores': 2}})
- logger = SynergineLogger(name='test')
-
- simulation = MySimulation(config)
-
- subjects = MySubjects(simulation=simulation)
- simulation.subjects = subjects
-
- cycle_manager = CycleManager(
- config=config,
- logger=logger,
- simulation=simulation,
- )
-
- events = cycle_manager.next()
- cycle_manager.stop()
-
- assert 1 == len(events)
- assert events[0].value == 4002
|