share.py 9.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. # coding: utf-8
  2. import pickle
  3. import typing
  4. import redis
  5. from synergine2.base import IdentifiedObject
  6. from synergine2.exceptions import SynergineException
  7. from synergine2.exceptions import UnknownSharedData
  8. if typing.TYPE_CHECKING:
  9. from synergine2.simulation import Subject
  10. class NoSharedDataInstance(SynergineException):
  11. pass
  12. class SharedDataIndex(object):
  13. def __init__(
  14. self,
  15. shared_data_manager: 'SharedDataManager',
  16. key: str,
  17. ) -> None:
  18. self.shared_data_manager = shared_data_manager
  19. self.key = key
  20. def add(self, subject: 'Subject', value: typing.Any) -> None:
  21. raise NotImplementedError()
  22. def remove(self, subject: 'Subject', value: typing.Any) -> None:
  23. raise NotImplementedError()
  24. def get_final_key(self, subject: 'Subject', value: typing.Any) -> str:
  25. return self.key.format(shared_key=value, subject_id=subject.id)
  26. class SharedData(object):
  27. def __init__(
  28. self,
  29. key: str,
  30. self_type: bool=False,
  31. default: typing.Any=None,
  32. ) -> None:
  33. """
  34. :param key: shared data key
  35. :param self_type: if it is a magic shared data where real key is association of key and instance id
  36. :param default: default/initial value to shared data. Can be a callable to return list or dict
  37. """
  38. self._key = key
  39. self.self_type = self_type
  40. self._default = default
  41. self.is_special_type = isinstance(self.default_value, (list, dict))
  42. self.type = type(self.default_value)
  43. if self.is_special_type:
  44. if isinstance(self.default_value, list):
  45. self.special_type = TrackedList
  46. elif isinstance(self.default_value, dict):
  47. self.special_type = TrackedDict
  48. else:
  49. raise NotImplementedError()
  50. def get_final_key(self, instance: IdentifiedObject) -> str:
  51. if self.self_type:
  52. return '{}_{}'.format(instance.id, self._key)
  53. return self._key
  54. @property
  55. def default_value(self) -> typing.Any:
  56. if callable(self._default):
  57. return self._default()
  58. return self._default
  59. class TrackedDict(dict):
  60. base = dict
  61. def __init__(self, seq=None, **kwargs):
  62. self.shared_data = kwargs.pop('shared_data')
  63. self.shared = kwargs.pop('shared')
  64. self.instance = kwargs.pop('instance')
  65. super().__init__(seq, **kwargs)
  66. def __setitem__(self, key, value):
  67. super().__setitem__(key, value)
  68. self.shared.set(self.shared_data.get_final_key(self.instance), dict(self))
  69. def setdefault(self, k, d=None):
  70. v = super().setdefault(k, d)
  71. self.shared.set(self.shared_data.get_final_key(self.instance), dict(self))
  72. return v
  73. # TODO: Cover all methods
  74. class TrackedList(list):
  75. base = list
  76. def __init__(self, seq=(), **kwargs):
  77. self.shared_data = kwargs.pop('shared_data')
  78. self.shared = kwargs.pop('shared')
  79. self.instance = kwargs.pop('instance')
  80. super().__init__(seq)
  81. def append(self, p_object):
  82. super().append(p_object)
  83. self.shared.set(self.shared_data.get_final_key(self.instance), list(self))
  84. def remove(self, object_):
  85. super().remove(object_)
  86. self.shared.set(self.shared_data.get_final_key(self.instance), list(self))
  87. def extend(self, iterable) -> None:
  88. super().extend(iterable)
  89. self.shared.set(self.shared_data.get_final_key(self.instance), list(self))
  90. # TODO: Cover all methods
  91. class SharedDataManager(object):
  92. """
  93. This object is designed to own shared memory between processes. It must be feed (with set method) before
  94. start of processes. Processes will only be able to access shared memory filled here before start.
  95. """
  96. def __init__(self, clear: bool=True):
  97. self._r = redis.StrictRedis(host='localhost', port=6379, db=0) # TODO: configs
  98. self._shared_data_list = [] # type: typing.List[SharedData]
  99. self._data = {}
  100. self._modified_keys = set()
  101. self._default_values = {}
  102. self._special_types = {} # type: typing.Dict[str, typing.Union[typing.Type[TrackedDict], typing.Type[TrackedList]]] # nopep8
  103. if clear:
  104. self.clear()
  105. def clear(self) -> None:
  106. self._r.flushdb()
  107. self._data = {}
  108. self._modified_keys = set()
  109. def reset(self) -> None:
  110. for key, value in self._default_values.items():
  111. self.set(key, value)
  112. self.commit()
  113. self._data = {}
  114. def purge_data(self):
  115. self._data = {}
  116. def set(self, key: str, value: typing.Any) -> None:
  117. # FIXME: Called tout le temps !
  118. self._data[key] = value
  119. self._modified_keys.add(key)
  120. def get(self, key: str) -> typing.Any:
  121. try:
  122. return self._data[key]
  123. except KeyError:
  124. database_value = self._r.get(key)
  125. if database_value is None:
  126. # We not allow None value storage
  127. raise UnknownSharedData('No shared data for key "{}"'.format(key))
  128. value = pickle.loads(database_value)
  129. self._data[key] = value
  130. return self._data[key]
  131. def commit(self) -> None:
  132. for key in self._modified_keys:
  133. value = self.get(key)
  134. self._r.set(key, pickle.dumps(value))
  135. self._modified_keys = set()
  136. def refresh(self) -> None:
  137. self._data = {}
  138. def make_index(
  139. self,
  140. shared_data_index_class: typing.Type[SharedDataIndex],
  141. key: str,
  142. *args: typing.Any,
  143. **kwargs: typing.Any
  144. ) -> SharedDataIndex:
  145. return shared_data_index_class(self, key, *args, **kwargs)
  146. def create_self(
  147. self,
  148. key: str,
  149. default: typing.Any,
  150. indexes: typing.List[SharedDataIndex]=None,
  151. ):
  152. return self.create(key, self_type=True, value=default, indexes=indexes)
  153. def create(
  154. self,
  155. key: str,
  156. value: typing.Any,
  157. self_type: bool=False,
  158. indexes: typing.List[SharedDataIndex]=None,
  159. ):
  160. # TODO: Store all keys and forbid re-use one
  161. indexes = indexes or []
  162. shared_data = SharedData(
  163. key=key,
  164. self_type=self_type,
  165. default=value,
  166. )
  167. self._shared_data_list.append(shared_data)
  168. def fget(instance):
  169. final_key = shared_data.get_final_key(instance)
  170. try:
  171. value_ = self.get(final_key)
  172. if not shared_data.is_special_type:
  173. return value_
  174. else:
  175. return shared_data.special_type(value_, shared_data=shared_data, shared=self, instance=instance)
  176. except UnknownSharedData:
  177. # If no data in database, value for this shared_data have been never set
  178. self.set(final_key, shared_data.default_value)
  179. self._default_values[final_key] = shared_data.default_value
  180. return self.get(final_key)
  181. def fset(instance, value_):
  182. final_key = shared_data.get_final_key(instance)
  183. try:
  184. previous_value = self.get(final_key)
  185. for index in indexes:
  186. index.remove(instance, previous_value)
  187. except UnknownSharedData:
  188. pass # If no shared data, no previous value to remove
  189. if not shared_data.is_special_type:
  190. self.set(final_key, value_)
  191. else:
  192. self.set(final_key, shared_data.type(value_))
  193. for index in indexes:
  194. index.add(instance, value_)
  195. def fdel(self_):
  196. raise SynergineException('You cannot delete a shared data: not implemented yet')
  197. shared_property = property(
  198. fget=fget,
  199. fset=fset,
  200. fdel=fdel,
  201. )
  202. # A simple shared data can be set now because no need to build key with instance id
  203. if not self_type:
  204. self.set(key, shared_data.default_value)
  205. self._default_values[key] = shared_data.default_value
  206. return shared_property
  207. # TODO: Does exist a way to permit overload of SharedDataManager class ?
  208. shared = SharedDataManager()
  209. class SubjectListIndex(SharedDataIndex):
  210. def add(self, subject: 'Subject', value):
  211. final_key = self.get_final_key(subject, value)
  212. try:
  213. values = self.shared_data_manager.get(final_key)
  214. except UnknownSharedData:
  215. values = []
  216. values.append(subject.id)
  217. self.shared_data_manager.set(final_key, values)
  218. def remove(self, subject: 'Subject', value):
  219. final_key = self.get_final_key(subject, value)
  220. values = self.shared_data_manager.get(final_key)
  221. values.remove(subject.id)
  222. self.shared_data_manager.set(final_key, values)
  223. class ListIndex(SharedDataIndex):
  224. def add(self, subject: 'Subject', value):
  225. try:
  226. values = self.shared_data_manager.get(self.key)
  227. except UnknownSharedData:
  228. values = []
  229. values.append(value)
  230. self.shared_data_manager.set(self.key, values)
  231. def remove(self, subject: 'Subject', value):
  232. values = self.shared_data_manager.get(self.key)
  233. values.remove(value)
  234. self.shared_data_manager.set(self.key, values)