123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493 |
- # coding: utf-8
- import typing
-
- import time
-
- from synergine2.base import BaseObject
- from synergine2.base import IdentifiedObject
- from synergine2.config import Config
- from synergine2.exceptions import ConfigurationError
- from synergine2.share import shared
- from synergine2.utils import get_mechanisms_classes
-
-
- class Intention(object):
- pass
-
-
- class IntentionManager(object):
- intentions = shared.create_self('intentions', lambda: {}) # type: typing.Dict[typing.Type[Intention], Intention]
-
- def __init__(self):
- self._id = id(self)
-
- @property
- def id(self) -> int:
- return self._id
-
- def set(self, intention: Intention) -> None:
- self.intentions[type(intention)] = intention
-
- def get(self, intention_type: typing.Type[Intention]) -> Intention:
- # TODO: Raise specialised exception if KeyError
- return self.intentions[intention_type]
-
- def remove(self, intention_type: typing.Type[Intention]) -> None:
- intentions = self.intentions
- del intentions[intention_type]
- self.intentions = intentions
-
- def remove_all(self) -> None:
- self.intentions = {}
-
-
- class Subject(IdentifiedObject):
- start_collections = []
- behaviours_classes = []
- behaviour_selector_class = None # type: typing.Type[SubjectBehaviourSelector]
- intention_manager_class = None # type: typing.Type[IntentionManager]
- collections = shared.create_self('collections', lambda: [])
-
- def __init__(
- self,
- config: Config,
- simulation: 'Simulation',
- properties: dict=None,
- ):
- """
-
- :param config: config object
- :param simulation: simulation object
- :param properties: additional data (will not change during simulation)
- """
- super().__init__()
- # FIXME: use shared data to permit dynamic start_collections
- self.collections.extend(self.start_collections[:])
-
- self.config = config
- self._id = id(self) # We store object id because it's lost between process
- self.simulation = simulation
- self.intentions = None
- self.properties = properties or {}
-
- if self.behaviour_selector_class:
- self.behaviour_selector = self.behaviour_selector_class()
- else:
- self.behaviour_selector = SubjectBehaviourSelector()
-
- if self.intention_manager_class:
- self.intentions = self.intention_manager_class()
- else:
- self.intentions = IntentionManager()
-
- self._mechanisms = None # type: typing.Dict[typing.Type['SubjectMechanism'], 'SubjectMechanism']
- self._behaviours = None # type: typing.Dict[typing.Type['SubjectBehaviour'], 'SubjectBehaviour']
-
- @property
- def mechanisms(self) -> typing.Dict[typing.Type['SubjectMechanism'], 'SubjectMechanism']:
- if self._mechanisms is None:
- self._mechanisms = {}
- for behaviour_class in self.behaviours_classes:
- for mechanism_class in behaviour_class.use:
- mechanism = mechanism_class(
- self.config,
- self.simulation,
- self,
- )
- self._mechanisms[mechanism_class] = mechanism
- return self._mechanisms
-
- @property
- def behaviours(self) -> typing.Dict[typing.Type['SubjectBehaviour'], 'SubjectBehaviour']:
- if self._behaviours is None:
- self._behaviours = {}
- for behaviour_class in self.behaviours_classes:
- behaviour = behaviour_class(
- self.config,
- self.simulation,
- self,
- )
- self._behaviours[behaviour_class] = behaviour
- return self._behaviours
-
- def change_id(self, id_: int) -> None:
- self._id = id_
-
- def expose(self) -> None:
- subject_classes = shared.get('subject_classes')
- subject_classes[self._id] = type(self)
- shared.set('subject_classes', subject_classes)
-
- for collection in self.collections:
- self.simulation.collections.setdefault(collection, []).append(self.id)
-
- def remove_collection(self, collection_name: str) -> None:
- self.collections.remove(collection_name)
- # Manipulate as shared property
- simulation_collection = self.simulation.collections[collection_name]
- simulation_collection.remove(self.id)
- self.simulation.collections[collection_name] = simulation_collection
-
- def __str__(self):
- return self.__repr__()
-
- def __repr__(self):
- return '{}({})'.format(
- type(self).__name__,
- self.id,
- )
-
-
- class Subjects(list):
- """
- TODO: Manage other list methods
- """
- subject_ids = shared.create('subject_ids', [])
-
- def __init__(self, *args, **kwargs):
- self.simulation = kwargs.pop('simulation')
- self.removes = []
- self.adds = []
- self.track_changes = False
- self.index = {}
- self._auto_expose = True
- super().__init__(*args, **kwargs)
-
- if self.auto_expose:
- for subject in self:
- subject.expose()
-
- @property
- def auto_expose(self) -> bool:
- return self._auto_expose
-
- @auto_expose.setter
- def auto_expose(self, value: bool) -> None:
- assert self._auto_expose
- self._auto_expose = value
-
- def remove(self, value: Subject):
- # Remove from index
- del self.index[value.id]
- self.subject_ids.remove(value.id)
- # Remove from subjects list
- super().remove(value)
- # Remove from start_collections
- for collection_name in value.collections:
- self.simulation.collections[collection_name].remove(value.id)
- # Add to removed listing
- if self.track_changes:
- self.removes.append(value)
- # TODO: Supprimer des choses du shared ! Sinon fuite mémoire dans la bdd
-
- def append(self, p_object):
- # Add to index
- self.index[p_object.id] = p_object
- self.subject_ids.append(p_object.id)
- # Add to subjects list
- super().append(p_object)
- # Add to adds list
- if self.track_changes:
- self.adds.append(p_object)
- if self.auto_expose:
- p_object.expose()
-
- def extend(self, iterable):
- for item in iterable:
- self.append(item)
-
-
- class Simulation(BaseObject):
- accepted_subject_class = Subjects
- behaviours_classes = [] # type: typing.List[typing.Type[SimulationBehaviour]]
-
- subject_classes = shared.create('subject_classes', {})
- collections = shared.create('collections', {})
-
- def __init__(
- self,
- config: Config,
- ):
- self.config = config
- self._subjects = None # type: Subjects
-
- self._index_locked = False
-
- self.behaviours = {}
- self.mechanisms = {}
-
- for mechanism_class in get_mechanisms_classes(self):
- self.mechanisms[mechanism_class] = mechanism_class(
- config=self.config,
- simulation=self,
- )
-
- for behaviour_class in self.behaviours_classes:
- self.behaviours[behaviour_class] = behaviour_class(
- config=self.config,
- simulation=self,
- )
-
- def lock_index(self) -> None:
- self._index_locked = True
-
- @property
- def subjects(self):
- return self._subjects
-
- @subjects.setter
- def subjects(self, value: 'Subjects'):
- if not isinstance(value, self.accepted_subject_class):
- raise Exception('Simulation.subjects must be {0} type'.format(
- self.accepted_subject_class,
- ))
- self._subjects = value
-
- def get_or_create_subject(self, subject_id: int) -> Subject:
- try:
- return self._subjects.index[subject_id]
- except KeyError:
- # We should be in process context and subject have to been created
- subject_class = shared.get('subject_classes')[subject_id]
- subject = subject_class(self.config, self)
- subject.change_id(subject_id)
- self.subjects.append(subject)
- return subject
-
-
- class Mechanism(BaseObject):
- pass
-
-
- class SubjectMechanism(Mechanism):
- def __init__(
- self,
- config: Config,
- simulation: Simulation,
- subject: Subject,
- ):
- self.config = config
- self.simulation = simulation
- self.subject = subject
-
- def run(self) -> dict:
- raise NotImplementedError()
-
-
- class SimulationMechanism(Mechanism):
- """If parallelizable behaviour, call """
- parallelizable = False
-
- def __init__(
- self,
- config: Config,
- simulation: Simulation,
- ):
- self.config = config
- self.simulation = simulation
-
- def repr_debug(self) -> str:
- return self.__class__.__name__
-
- def run(self, process_number: int=None, process_count: int=None):
- raise NotImplementedError()
-
-
- class Event(BaseObject):
- def repr_debug(self) -> str:
- return self.__dict__
-
-
- class Behaviour(BaseObject):
- def __init__(self):
- self.last_execution_time = 0
-
- @property
- def cycle_frequency(self) -> typing.Optional[float]:
- return None
-
- @property
- def seconds_frequency(self) -> typing.Optional[float]:
- """
- If this behaviour is time based, return here the waiting time between two
- executions. IMPORTANT: your behaviour have to update it's
- self.last_execution_time attribute when executed (in `run` method)!
- :return: float number of period in seconds
- """
- return None
-
- def run(self, data):
- raise NotImplementedError()
-
- def is_skip(self, cycle_number: int) -> bool:
- """
- :return: True if behaviour have to be skip this time
- """
- cycle_frequency = self.cycle_frequency
- if cycle_frequency is not None:
- return bool(cycle_number % cycle_frequency)
-
- seconds_frequency = self.seconds_frequency
- if seconds_frequency is not None:
- return float(time.time() - self.last_execution_time) <= seconds_frequency
-
- return False
-
-
- class SubjectBehaviour(Behaviour):
- use = [] # type: typing.List[typing.Type[SubjectMechanism]]
-
- def __init__(
- self,
- config: Config,
- simulation: Simulation,
- subject: Subject,
- ):
- super().__init__()
- self.config = config
- self.simulation = simulation
- self.subject = subject
-
- def is_terminated(self) -> bool:
- """
- :return: True if behaviour will no longer exist (can be removed from simulation)
- """
- return False
-
- def run(self, data) -> object:
- """
- Method called in subprocess.
- If return equivalent to False, behaviour produce nothing.
- If return equivalent to True, action bahaviour method
- will be called with these data
- Note: Returned data will be transfered from sub processes.
- Prefer scalar types.
- """
- raise NotImplementedError() # TODO Test it and change to strictly False
-
- def action(self, data) -> [Event]:
- """
- Method called in main process
- Return events will be give to terminals
- """
- raise NotImplementedError()
-
-
- class SimulationBehaviour(Behaviour):
- use = [] # type: typing.List[typing.Type[SimulationMechanism]]
-
- def __init__(
- self,
- config: Config,
- simulation: Simulation,
- ):
- super().__init__()
- self.config = config
- self.simulation = simulation
-
- def run(self, data):
- """
- Method called in subprocess if mechanisms are
- parallelizable, in main process if not.
- """
- raise NotImplementedError()
-
- @classmethod
- def merge_data(cls, new_data, start_data=None):
- """
- Called if behaviour executed in subprocess
- """
- raise NotImplementedError()
-
- def action(self, data) -> typing.List[Event]:
- """
- Method called in main process
- Return events will be give to terminals
- """
- raise NotImplementedError()
-
-
- class SubjectBehaviourSelector(BaseObject):
- def reduce_behaviours(
- self,
- behaviours: typing.Dict[typing.Type[SubjectBehaviour], object],
- ) -> typing.Dict[typing.Type[SubjectBehaviour], object]:
- return behaviours
-
-
- class BehaviourStep(object):
- def proceed(self) -> 'BehaviourStep':
- raise NotImplementedError()
-
- def generate_data(self) -> typing.Any:
- raise NotImplementedError()
-
- def get_events(self) -> typing.List[Event]:
- raise NotImplementedError()
-
-
- class SubjectComposedBehaviour(SubjectBehaviour):
- """
- SubjectComposedBehaviour receive data in run (and will will it's step with it).
- These data can be the first data of behaviour, or last behaviour subject data
- produced in action.
- SubjectComposedBehaviour produce data in run only if something happen and must be
- given in future run.
- """
- step_classes = None # type: typing.List[typing.Tuple[typing.Type[BehaviourStep], typing.Callable[[typing.Any], bool]]] # nopep8
-
- def __init__(
- self,
- config: Config,
- simulation: Simulation,
- subject: Subject,
- ) -> None:
- super().__init__(config, simulation, subject)
- if self.step_classes is None:
- raise ConfigurationError(
- '{}: you must set step_classes class attribute'.format(
- self.__class__.__name__,
- ),
- )
-
- def get_step(self, data) -> BehaviourStep:
- for step_class, step_test_callable in self.step_classes:
- if step_test_callable(data):
- return step_class(**data)
-
- raise ConfigurationError(
- '{}: No step choose for following data: {}'.format(
- self.__class__.__name__,
- data,
- ),
- )
-
- def run(self, data):
- step = self.get_step(data)
- next_step = step.proceed()
- return next_step.generate_data()
-
- def action(self, data):
- step = self.get_step(data)
- return step.get_events()
-
-
- def disable_when(
- predicate: typing.Callable[[Config, Simulation], bool],
- ):
- def decorator(func) -> typing.Any:
- def wrapper(self, *args, **kwargs):
- if predicate(self.config, self.simulation):
- return False
- return func(self, *args, **kwargs)
- return wrapper
- return decorator
-
-
- def config_value(parameter_to_resolve: str):
- def predicate(
- config: Config,
- simulation: Simulation,
- ):
- return bool(config.resolve(parameter_to_resolve, False))
- return predicate
|