[gd_scene load_steps=2 format=3 uid="uid://cdt4rn31xeo85"] [sub_resource type="GDScript" id="GDScript_r4hca"] script/source = "extends Node class Queue: extends RefCounted var _opaque:Array = [] var _mutex:Mutex = Mutex.new() var _owner func size() -> int: var _size:int _mutex.lock() _size = _opaque.size() _mutex.unlock() return _size func enqueue(val, check_exist:bool = false) -> bool: _mutex.lock() if (!check_exist || !_opaque.has(val)): _opaque.push_back(val) _mutex.unlock() return true func dequeue() -> Variant: var ret = null _mutex.lock() ret = _opaque.pop_front() _mutex.unlock() return ret func bind_owner(owner:Variant) -> void: _mutex.lock() _owner = owner _mutex.unlock() func get_owner() -> Variant: var owner = null _mutex.lock() owner = _owner _mutex.unlock() return owner func unbind_owner() -> void: _mutex.lock() _owner = null _mutex.unlock() class Worker: extends Thread var should_working:bool func _main(global_mq:Queue): print(\"Worker thread started [id: {0}]\".format([get_id()])) while should_working: var mq:Queue = global_mq.dequeue() if null == mq: OS.delay_msec(0) else: var msg = mq.dequeue() if null == msg: OS.delay_msec(0) else: var callback = mq.get_owner() if null == callback: continue else: callback.call(msg) global_mq.enqueue(mq) print(\"Worker thread stopped [id: {0}]\".format([get_id()])) func start_working(owner) -> void: should_working = true start(Callable(self, '_main').bind(owner._global_mq)) func stop_working(wait:bool = true) -> void: should_working = false if wait: wait_to_finish() const WORKER_AMOUNT_MIN:int = 1 var WORKER_AMOUNT_MAX:int = OS.get_processor_count() var _should_running:bool = true var _global_mq:Queue = Queue.new() var _service_and_mq:Dictionary = {} var _workers:Array = [] func _add_worker() -> void: if _workers.size() >= WORKER_AMOUNT_MAX: return var worker:Worker = Worker.new() worker.start_working(self) _workers.push_back(worker) print('worker count adjust to {0}'.format([_workers.size()])) func _kick_worker(worker:Worker) -> void: worker.stop_working(false) print('worker count adjust to {0}'.format([_workers.size()])) func start() -> void: stop() _should_running = true check_workers() func stop() -> void: _should_running = false while _workers.size() > 0: var worker:Worker = _workers.pop_back() as Worker worker.stop_working() while true: var mq:Queue = _global_mq.dequeue() as Queue if null == mq: break _service_and_mq.clear() func check_workers() -> void: if !_should_running: return # TODO: remove dead workers # check busy, and give more workers if _global_mq.size() > 0: # maybe 1 ? _add_worker() # ensure min services while _workers.size() < WORKER_AMOUNT_MIN: _add_worker() # release overamount services while _workers.size() > WORKER_AMOUNT_MAX: _kick_worker(_workers.pop_back() as Worker) func send_message(service_id:int, message) -> void: var mq:Queue = instance_from_id(service_id) as Queue mq.enqueue(message) func add_service(callback:Callable) -> int: if _service_and_mq.has(callback): return -1 var mq:Queue = Queue.new() mq.bind_owner(callback) _service_and_mq[callback] = mq _global_mq.enqueue(mq) return mq.get_instance_id() func remove_service(callback:Callable) -> bool: if _service_and_mq.has(callback): var mq:Queue = _service_and_mq[callback] mq.unbind_owner() _service_and_mq.erase(callback) return true return false " [node name="microserver" type="Node"] script = SubResource("GDScript_r4hca") [node name="thread_updator" type="Timer" parent="."] autostart = true [connection signal="tree_entered" from="." to="." method="start"] [connection signal="tree_exiting" from="." to="." method="stop"] [connection signal="timeout" from="thread_updator" to="." method="check_workers"]