test_cycle.py 4.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. # coding: utf-8
  2. from synergine2.config import Config
  3. from synergine2.cycle import CycleManager
  4. from synergine2.log import SynergineLogger
  5. from synergine2.share import shared
  6. from synergine2.simulation import Simulation
  7. from synergine2.simulation import SimulationBehaviour
  8. from synergine2.simulation import SimulationMechanism
  9. from synergine2.simulation import Event
  10. from synergine2.simulation import Subject
  11. from synergine2.simulation import Subjects
  12. from synergine2.simulation import SubjectMechanism
  13. from synergine2.simulation import SubjectBehaviour
  14. from tests import BaseTest
  15. class MyEvent(Event):
  16. def __init__(self, value):
  17. self.value = value
  18. class MySubjectMechanism(SubjectMechanism):
  19. def run(self):
  20. return 42
  21. class MySubjectBehavior(SubjectBehaviour):
  22. use = [MySubjectMechanism]
  23. def run(self, data):
  24. class_name = MySubjectMechanism.__name__
  25. if class_name in data and data[class_name] == 42:
  26. return self.subject.id
  27. def action(self, data) -> [Event]:
  28. return [MyEvent(data * 2)]
  29. class MySubject(Subject):
  30. behaviours_classes = [MySubjectBehavior]
  31. class MySubjects(Subjects):
  32. pass
  33. class MySimulationMechanism(SimulationMechanism):
  34. def run(self, process_number: int = None, process_count: int = None):
  35. return process_number + 1000
  36. class MySimulationBehaviour(SimulationBehaviour):
  37. use = [MySimulationMechanism]
  38. @classmethod
  39. def merge_data(cls, new_data, start_data=None):
  40. start_data = start_data or 0
  41. return start_data + new_data
  42. def run(self, data):
  43. return data['MySimulationMechanism'] * 2
  44. def action(self, data) -> [Event]:
  45. return [MyEvent(data)]
  46. class MySimulation(Simulation):
  47. behaviours_classes = [MySimulationBehaviour]
  48. class TestCycle(BaseTest):
  49. def test_subjects_cycle(self):
  50. shared.reset()
  51. config = Config({'core': {'use_x_cores': 2}})
  52. logger = SynergineLogger(name='test')
  53. simulation = Simulation(config)
  54. subjects = MySubjects(simulation=simulation)
  55. simulation.subjects = subjects
  56. # Prepare simulation class index
  57. simulation.add_to_index(MySubjectBehavior)
  58. simulation.add_to_index(MySubjectMechanism)
  59. simulation.add_to_index(MySubject)
  60. for i in range(3):
  61. subjects.append(MySubject(config, simulation=simulation))
  62. cycle_manager = CycleManager(
  63. config=config,
  64. logger=logger,
  65. simulation=simulation,
  66. )
  67. events = cycle_manager.next()
  68. cycle_manager.stop()
  69. assert 3 == len(events)
  70. event_values = [e.value for e in events]
  71. assert all([s.id * 2 in event_values for s in subjects])
  72. def test_new_subject(self):
  73. shared.reset()
  74. config = Config({'core': {'use_x_cores': 1}})
  75. logger = SynergineLogger(name='test')
  76. simulation = Simulation(config)
  77. subjects = MySubjects(simulation=simulation)
  78. simulation.subjects = subjects
  79. # Prepare simulation class index
  80. simulation.add_to_index(MySubjectBehavior)
  81. simulation.add_to_index(MySubjectMechanism)
  82. simulation.add_to_index(MySubject)
  83. for i in range(3):
  84. subjects.append(MySubject(config, simulation=simulation))
  85. cycle_manager = CycleManager(
  86. config=config,
  87. logger=logger,
  88. simulation=simulation,
  89. )
  90. events = cycle_manager.next()
  91. assert 3 == len(events)
  92. event_values = [e.value for e in events]
  93. assert all([s.id * 2 in event_values for s in subjects])
  94. subjects.append(MySubject(config, simulation=simulation))
  95. events = cycle_manager.next()
  96. cycle_manager.stop()
  97. assert 4 == len(events)
  98. event_values = [e.value for e in events]
  99. assert all([s.id * 2 in event_values for s in subjects])
  100. def test_simulation_events(self):
  101. shared.reset()
  102. config = Config({'core': {'use_x_cores': 2}})
  103. logger = SynergineLogger(name='test')
  104. simulation = MySimulation(config)
  105. # Prepare simulation class index
  106. simulation.add_to_index(MySimulationBehaviour, MySimulationMechanism)
  107. subjects = MySubjects(simulation=simulation)
  108. simulation.subjects = subjects
  109. cycle_manager = CycleManager(
  110. config=config,
  111. logger=logger,
  112. simulation=simulation,
  113. )
  114. events = cycle_manager.next()
  115. cycle_manager.stop()
  116. assert 1 == len(events)
  117. assert events[0].value == 4002