microserver.tscn 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. [gd_scene load_steps=2 format=3 uid="uid://cdt4rn31xeo85"]
  2. [sub_resource type="GDScript" id="GDScript_r4hca"]
  3. script/source = "extends Node
  4. class Queue:
  5. extends RefCounted
  6. var _opaque:Array = []
  7. var _mutex:Mutex = Mutex.new()
  8. var _owner
  9. func size() -> int:
  10. var _size:int
  11. _mutex.lock()
  12. _size = _opaque.size()
  13. _mutex.unlock()
  14. return _size
  15. func enqueue(val, check_exist:bool = false) -> bool:
  16. _mutex.lock()
  17. if (!check_exist || !_opaque.has(val)):
  18. _opaque.push_back(val)
  19. _mutex.unlock()
  20. return true
  21. func dequeue() -> Variant:
  22. var ret = null
  23. _mutex.lock()
  24. ret = _opaque.pop_front()
  25. _mutex.unlock()
  26. return ret
  27. func bind_owner(owner:Variant) -> void:
  28. _mutex.lock()
  29. _owner = owner
  30. _mutex.unlock()
  31. func get_owner() -> Variant:
  32. var owner = null
  33. _mutex.lock()
  34. owner = _owner
  35. _mutex.unlock()
  36. return owner
  37. func unbind_owner() -> void:
  38. _mutex.lock()
  39. _owner = null
  40. _mutex.unlock()
  41. class Worker:
  42. extends Thread
  43. var should_working:bool
  44. func _main(global_mq:Queue):
  45. print(\"Worker thread started [id: {0}]\".format([get_id()]))
  46. while should_working:
  47. var mq:Queue = global_mq.dequeue()
  48. if null == mq:
  49. OS.delay_msec(0)
  50. else:
  51. var msg = mq.dequeue()
  52. if null == msg:
  53. OS.delay_msec(0)
  54. else:
  55. var callback = mq.get_owner()
  56. if null == callback:
  57. continue
  58. else:
  59. callback.call(msg)
  60. global_mq.enqueue(mq)
  61. print(\"Worker thread stopped [id: {0}]\".format([get_id()]))
  62. func start_working(owner) -> void:
  63. should_working = true
  64. start(Callable(self, '_main').bind(owner._global_mq))
  65. func stop_working(wait:bool = true) -> void:
  66. should_working = false
  67. if wait:
  68. wait_to_finish()
  69. const WORKER_AMOUNT_MIN:int = 1
  70. var WORKER_AMOUNT_MAX:int = OS.get_processor_count()
  71. var _should_running:bool = true
  72. var _global_mq:Queue = Queue.new()
  73. var _service_and_mq:Dictionary = {}
  74. var _workers:Array = []
  75. func _add_worker() -> void:
  76. if _workers.size() >= WORKER_AMOUNT_MAX:
  77. return
  78. var worker:Worker = Worker.new()
  79. worker.start_working(self)
  80. _workers.push_back(worker)
  81. print('worker count adjust to {0}'.format([_workers.size()]))
  82. func _kick_worker(worker:Worker) -> void:
  83. worker.stop_working(false)
  84. print('worker count adjust to {0}'.format([_workers.size()]))
  85. func start() -> void:
  86. stop()
  87. _should_running = true
  88. check_workers()
  89. func stop() -> void:
  90. _should_running = false
  91. while _workers.size() > 0:
  92. var worker:Worker = _workers.pop_back() as Worker
  93. worker.stop_working()
  94. while true:
  95. var mq:Queue = _global_mq.dequeue() as Queue
  96. if null == mq:
  97. break
  98. _service_and_mq.clear()
  99. func check_workers() -> void:
  100. if !_should_running:
  101. return
  102. # TODO: remove dead workers
  103. # check busy, and give more workers
  104. if _global_mq.size() > 0: # maybe 1 ?
  105. _add_worker()
  106. # ensure min services
  107. while _workers.size() < WORKER_AMOUNT_MIN:
  108. _add_worker()
  109. # release overamount services
  110. while _workers.size() > WORKER_AMOUNT_MAX:
  111. _kick_worker(_workers.pop_back() as Worker)
  112. func send_message(service_id:int, message) -> void:
  113. var mq:Queue = instance_from_id(service_id) as Queue
  114. mq.enqueue(message)
  115. func add_service(callback:Callable) -> int:
  116. if _service_and_mq.has(callback):
  117. return -1
  118. var mq:Queue = Queue.new()
  119. mq.bind_owner(callback)
  120. _service_and_mq[callback] = mq
  121. _global_mq.enqueue(mq)
  122. return mq.get_instance_id()
  123. func remove_service(callback:Callable) -> bool:
  124. if _service_and_mq.has(callback):
  125. var mq:Queue = _service_and_mq[callback]
  126. mq.unbind_owner()
  127. _service_and_mq.erase(callback)
  128. return true
  129. return false
  130. "
  131. [node name="microserver" type="Node"]
  132. script = SubResource("GDScript_r4hca")
  133. [node name="thread_updator" type="Timer" parent="."]
  134. autostart = true
  135. [connection signal="tree_entered" from="." to="." method="start"]
  136. [connection signal="tree_exiting" from="." to="." method="stop"]
  137. [connection signal="timeout" from="thread_updator" to="." method="check_workers"]