|
@@ -0,0 +1,162 @@
|
|
|
|
+[gd_scene load_steps=2 format=3 uid="uid://cdt4rn31xeo85"]
|
|
|
|
+
|
|
|
|
+[sub_resource type="GDScript" id="GDScript_r4hca"]
|
|
|
|
+script/source = "extends Node
|
|
|
|
+
|
|
|
|
+class Queue:
|
|
|
|
+ extends RefCounted
|
|
|
|
+ var _opaque:Array = []
|
|
|
|
+ var _mutex:Mutex = Mutex.new()
|
|
|
|
+ var _owner
|
|
|
|
+
|
|
|
|
+ func size() -> int:
|
|
|
|
+ var _size:int
|
|
|
|
+ _mutex.lock()
|
|
|
|
+ _size = _opaque.size()
|
|
|
|
+ _mutex.unlock()
|
|
|
|
+ return _size
|
|
|
|
+
|
|
|
|
+ func enqueue(val, check_exist:bool = false) -> bool:
|
|
|
|
+ _mutex.lock()
|
|
|
|
+ if (!check_exist || !_opaque.has(val)):
|
|
|
|
+ _opaque.push_back(val)
|
|
|
|
+ _mutex.unlock()
|
|
|
|
+ return true
|
|
|
|
+
|
|
|
|
+ func dequeue() -> Variant:
|
|
|
|
+ var ret = null
|
|
|
|
+ _mutex.lock()
|
|
|
|
+ ret = _opaque.pop_front()
|
|
|
|
+ _mutex.unlock()
|
|
|
|
+ return ret
|
|
|
|
+
|
|
|
|
+ func bind_owner(owner:Variant) -> void:
|
|
|
|
+ _mutex.lock()
|
|
|
|
+ _owner = owner
|
|
|
|
+ _mutex.unlock()
|
|
|
|
+
|
|
|
|
+ func get_owner() -> Variant:
|
|
|
|
+ var owner = null
|
|
|
|
+ _mutex.lock()
|
|
|
|
+ owner = _owner
|
|
|
|
+ _mutex.unlock()
|
|
|
|
+ return owner
|
|
|
|
+
|
|
|
|
+ func unbind_owner() -> void:
|
|
|
|
+ _mutex.lock()
|
|
|
|
+ _owner = null
|
|
|
|
+ _mutex.unlock()
|
|
|
|
+
|
|
|
|
+class Worker:
|
|
|
|
+ extends Thread
|
|
|
|
+ var should_working:bool
|
|
|
|
+
|
|
|
|
+ func _main(global_mq:Queue):
|
|
|
|
+ print(\"Worker thread started [id: {0}]\".format([get_id()]))
|
|
|
|
+ while should_working:
|
|
|
|
+ var mq:Queue = global_mq.dequeue()
|
|
|
|
+ if null == mq:
|
|
|
|
+ OS.delay_msec(0)
|
|
|
|
+ else:
|
|
|
|
+ var msg = mq.dequeue()
|
|
|
|
+ if null == msg:
|
|
|
|
+ OS.delay_msec(0)
|
|
|
|
+ else:
|
|
|
|
+ var callback = mq.get_owner()
|
|
|
|
+ if null == callback:
|
|
|
|
+ continue
|
|
|
|
+ else:
|
|
|
|
+ callback.call(msg)
|
|
|
|
+ global_mq.enqueue(mq)
|
|
|
|
+ print(\"Worker thread stopped [id: {0}]\".format([get_id()]))
|
|
|
|
+
|
|
|
|
+ func start_working(owner) -> void:
|
|
|
|
+ should_working = true
|
|
|
|
+ start(Callable(self, '_main').bind(owner._global_mq))
|
|
|
|
+
|
|
|
|
+ func stop_working(wait:bool = true) -> void:
|
|
|
|
+ should_working = false
|
|
|
|
+ if wait:
|
|
|
|
+ wait_to_finish()
|
|
|
|
+
|
|
|
|
+const WORKER_AMOUNT_MIN:int = 1
|
|
|
|
+var WORKER_AMOUNT_MAX:int = OS.get_processor_count()
|
|
|
|
+
|
|
|
|
+var _should_running:bool = true
|
|
|
|
+var _global_mq:Queue = Queue.new()
|
|
|
|
+var _service_and_mq:Dictionary = {}
|
|
|
|
+var _workers:Array = []
|
|
|
|
+
|
|
|
|
+func _add_worker() -> void:
|
|
|
|
+ if _workers.size() >= WORKER_AMOUNT_MAX:
|
|
|
|
+ return
|
|
|
|
+ var worker:Worker = Worker.new()
|
|
|
|
+ worker.start_working(self)
|
|
|
|
+ _workers.push_back(worker)
|
|
|
|
+ print('worker count adjust to {0}'.format([_workers.size()]))
|
|
|
|
+
|
|
|
|
+func _kick_worker(worker:Worker) -> void:
|
|
|
|
+ worker.stop_working(false)
|
|
|
|
+ print('worker count adjust to {0}'.format([_workers.size()]))
|
|
|
|
+
|
|
|
|
+func start() -> void:
|
|
|
|
+ stop()
|
|
|
|
+ _should_running = true
|
|
|
|
+ check_workers()
|
|
|
|
+
|
|
|
|
+func stop() -> void:
|
|
|
|
+ _should_running = false
|
|
|
|
+ while _workers.size() > 0:
|
|
|
|
+ var worker:Worker = _workers.pop_back() as Worker
|
|
|
|
+ worker.stop_working()
|
|
|
|
+ while true:
|
|
|
|
+ var mq:Queue = _global_mq.dequeue() as Queue
|
|
|
|
+ if null == mq:
|
|
|
|
+ break
|
|
|
|
+ _service_and_mq.clear()
|
|
|
|
+
|
|
|
|
+func check_workers() -> void:
|
|
|
|
+ if !_should_running:
|
|
|
|
+ return
|
|
|
|
+ # TODO: remove dead workers
|
|
|
|
+ # check busy, and give more workers
|
|
|
|
+ if _global_mq.size() > 0: # maybe 1 ?
|
|
|
|
+ _add_worker()
|
|
|
|
+ # ensure min services
|
|
|
|
+ while _workers.size() < WORKER_AMOUNT_MIN:
|
|
|
|
+ _add_worker()
|
|
|
|
+ # release overamount services
|
|
|
|
+ while _workers.size() > WORKER_AMOUNT_MAX:
|
|
|
|
+ _kick_worker(_workers.pop_back() as Worker)
|
|
|
|
+
|
|
|
|
+func send_message(service_id:int, message) -> void:
|
|
|
|
+ var mq:Queue = instance_from_id(service_id) as Queue
|
|
|
|
+ mq.enqueue(message)
|
|
|
|
+
|
|
|
|
+func add_service(callback:Callable) -> int:
|
|
|
|
+ if _service_and_mq.has(callback):
|
|
|
|
+ return -1
|
|
|
|
+ var mq:Queue = Queue.new()
|
|
|
|
+ mq.bind_owner(callback)
|
|
|
|
+ _service_and_mq[callback] = mq
|
|
|
|
+ _global_mq.enqueue(mq)
|
|
|
|
+ return mq.get_instance_id()
|
|
|
|
+
|
|
|
|
+func remove_service(callback:Callable) -> bool:
|
|
|
|
+ if _service_and_mq.has(callback):
|
|
|
|
+ var mq:Queue = _service_and_mq[callback]
|
|
|
|
+ mq.unbind_owner()
|
|
|
|
+ _service_and_mq.erase(callback)
|
|
|
|
+ return true
|
|
|
|
+ return false
|
|
|
|
+"
|
|
|
|
+
|
|
|
|
+[node name="microserver" type="Node"]
|
|
|
|
+script = SubResource("GDScript_r4hca")
|
|
|
|
+
|
|
|
|
+[node name="thread_updator" type="Timer" parent="."]
|
|
|
|
+autostart = true
|
|
|
|
+
|
|
|
|
+[connection signal="tree_entered" from="." to="." method="start"]
|
|
|
|
+[connection signal="tree_exiting" from="." to="." method="stop"]
|
|
|
|
+[connection signal="timeout" from="thread_updator" to="." method="check_workers"]
|