123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- [gd_scene load_steps=2 format=3 uid="uid://cdt4rn31xeo85"]
- [sub_resource type="GDScript" id="GDScript_r4hca"]
- script/source = "extends Node
- class Queue:
- extends RefCounted
- var _opaque:Array = []
- var _mutex:Mutex = Mutex.new()
- var _owner
- func size() -> int:
- var _size:int
- _mutex.lock()
- _size = _opaque.size()
- _mutex.unlock()
- return _size
- func enqueue(val, check_exist:bool = false) -> bool:
- _mutex.lock()
- if (!check_exist || !_opaque.has(val)):
- _opaque.push_back(val)
- _mutex.unlock()
- return true
- func dequeue() -> Variant:
- var ret = null
- _mutex.lock()
- ret = _opaque.pop_front()
- _mutex.unlock()
- return ret
- func bind_owner(owner:Variant) -> void:
- _mutex.lock()
- _owner = owner
- _mutex.unlock()
- func get_owner() -> Variant:
- var owner = null
- _mutex.lock()
- owner = _owner
- _mutex.unlock()
- return owner
- func unbind_owner() -> void:
- _mutex.lock()
- _owner = null
- _mutex.unlock()
- class Worker:
- extends Thread
- var should_working:bool
-
- func _main(global_mq:Queue):
- print(\"Worker thread started [id: {0}]\".format([get_id()]))
- while should_working:
- var mq:Queue = global_mq.dequeue()
- if null == mq:
- OS.delay_msec(0)
- else:
- var msg = mq.dequeue()
- if null == msg:
- OS.delay_msec(0)
- else:
- var callback = mq.get_owner()
- if null == callback:
- continue
- else:
- callback.call(msg)
- global_mq.enqueue(mq)
- print(\"Worker thread stopped [id: {0}]\".format([get_id()]))
- func start_working(owner) -> void:
- should_working = true
- start(Callable(self, '_main').bind(owner._global_mq))
- func stop_working(wait:bool = true) -> void:
- should_working = false
- if wait:
- wait_to_finish()
- const WORKER_AMOUNT_MIN:int = 1
- var WORKER_AMOUNT_MAX:int = OS.get_processor_count()
- var _should_running:bool = true
- var _global_mq:Queue = Queue.new()
- var _service_and_mq:Dictionary = {}
- var _workers:Array = []
- func _add_worker() -> void:
- if _workers.size() >= WORKER_AMOUNT_MAX:
- return
- var worker:Worker = Worker.new()
- worker.start_working(self)
- _workers.push_back(worker)
- print('worker count adjust to {0}'.format([_workers.size()]))
- func _kick_worker(worker:Worker) -> void:
- worker.stop_working(false)
- print('worker count adjust to {0}'.format([_workers.size()]))
- func start() -> void:
- stop()
- _should_running = true
- check_workers()
- func stop() -> void:
- _should_running = false
- while _workers.size() > 0:
- var worker:Worker = _workers.pop_back() as Worker
- worker.stop_working()
- while true:
- var mq:Queue = _global_mq.dequeue() as Queue
- if null == mq:
- break
- _service_and_mq.clear()
- func check_workers() -> void:
- if !_should_running:
- return
- # TODO: remove dead workers
- # check busy, and give more workers
- if _global_mq.size() > 0: # maybe 1 ?
- _add_worker()
- # ensure min services
- while _workers.size() < WORKER_AMOUNT_MIN:
- _add_worker()
- # release overamount services
- while _workers.size() > WORKER_AMOUNT_MAX:
- _kick_worker(_workers.pop_back() as Worker)
- func send_message(service_id:int, message) -> void:
- var mq:Queue = instance_from_id(service_id) as Queue
- mq.enqueue(message)
- func add_service(callback:Callable) -> int:
- if _service_and_mq.has(callback):
- return -1
- var mq:Queue = Queue.new()
- mq.bind_owner(callback)
- _service_and_mq[callback] = mq
- _global_mq.enqueue(mq)
- return mq.get_instance_id()
- func remove_service(callback:Callable) -> bool:
- if _service_and_mq.has(callback):
- var mq:Queue = _service_and_mq[callback]
- mq.unbind_owner()
- _service_and_mq.erase(callback)
- return true
- return false
- "
- [node name="microserver" type="Node"]
- script = SubResource("GDScript_r4hca")
- [node name="thread_updator" type="Timer" parent="."]
- autostart = true
- [connection signal="tree_entered" from="." to="." method="start"]
- [connection signal="tree_exiting" from="." to="." method="stop"]
- [connection signal="timeout" from="thread_updator" to="." method="check_workers"]
|