Have you ever wanted to do more things with Godot's signals? Maybe merge them or filter them Rx-style? Then Signal Extensions (or Sx for short) are for you. This simple and lightweight library allows you to do basic operations on regular Signals and treat them like reactive-ish streams.
DISCLAIMER: This addon is NOT an Rx implementation. Therefore, it does not, nor will it ever, implement all the functionality and operators of Rx. If you need a proper Rx implementation for Godot 4, see this excellent project: GodotRx that Sx is largely inspired by.
The main goal of this library is to allow for more advanced signal handling in Godot.
Godot's built-in signal system is robust, but could really profit from reactive approach, which works pretty well with games. Especially when a lot of components interact with each other.
Those kind of manipulations could be done by an Rx framework (like GodotRx mentioned above), but this means including a package, that due to the sheer scale of mechanisms needed for pure Rx to work, is really big and bloated for use cases you might actually encounter in your code.
Instead of including a big and complex Rx solution, why not extend the existing mechanism?
Sx allows for signal manipulation that feels like Rx, without all the overhead.
- Copy
addons/signal_extensions
directory to the/addons/
directory in your project - Enable
SignalExtensions
plugin in Project Settings -> Plugins - You're done! You now have access to Sx singleton!
In order to perform operations on signals, they have to be converted to SxSignal:
signal my_signal
var my_wrapped_signal := Sx.from(my_signal)
To subscribe to emissions, use .subscribe(Callable)
:
signal my_signal
Sx.from(my_signal).subscribe(func(): print("Got it!"))
my_signal.emit()
# result:
# Got it!
Much like when connecting to native signals, some flags can be passed to control the signal connection. See: enum Object.ConnectFlags. Please note that using CONNECT_ONE_SHOT might break the subscription system of GodotSx. Use .first() operator instead.
signal my_signal
# this will defer emissions to idle frame, instead of sending them immediately.
Sx.from(my_signal).subscribe(func(): pass, CONNECT_DEFERRED)
You can filter and map emitted items much like in regular Rx implementations:
signal value_changed(int)
# multiply only positive numbers by 2
Sx.from(value_changed) \
.filter(func(value: int): return value > 0) \
.map(func(value: int): return value * 2) \
.subscribe(func(value: int): print(value))
value_changed.emit(-2)
value_changed.emit(-1)
value_changed.emit(0)
value_changed.emit(1)
value_changed.emit(2)
# result:
# 2
# 4
Sx supports signals with up to 6 arguments and they can also be filtered and mapped:
signal multi_values(int, int2)
# when mapping multiple values, array must be returned from lambda
Sx.from(multi_values) \
.filter(func(value1: int, value2: int): return value2 > value1) \
.map(func(value1: int, value2: int): return [value2, value1]) \
.subscribe(func(value1: int, value2: int): print(value1, " ", value2))
multi_values.emit(2, 1)
multi_values.emit(3, 10)
# result:
# 10 3
Number of arguments down the chain can be freely changed using map:
signal int_values(int)
Sx.from(int_values) \
.map(func(value: int): return [value, value * 2]) \
.subscribe(func(value1: int, value2: int): print(value1, " ", value2))
int_values.emit(3)
# result:
# 3 6
Multiple Godot signals can be merged into one using Sx.merge_from()
:
signal signal1(int)
signal signal2(int)
Sx.merge_from([signal1, signal2]).subscribe(func(value: int): print(value))
signal1.emit(1)
signal2.emit(2)
# result:
# 1
# 2
Multiple SxSignals can also be merged easily using Sx.merge()
:
signal signal1(int)
signal signal2(int)
Sx.merge([
Sx.from(signal1).map(func(value: int): return value * 2),
Sx.from(signal2).map(func(value: int): return value * 3)
]).subscribe(func(value: int): print(value))
signal1.emit(1)
signal2.emit(2)
# result:
# 2
# 6
Alternatively, you can use merge
operator:
signal signal1(int)
signal signal2(int)
Sx.from(signal1).map(func(value: int): return value * 2).merge([
Sx.from(signal2).map(func(value: int): return value * 3)
]).subscribe(func(value: int): print(value))
signal1.emit(1)
signal2.emit(2)
# result:
# 2
# 6
To simplify creation of periodic interval timers (without the hassle of creating and managing a Timer node yourself),
you can use Sx.interval_timer()
to create a SxSignal that will periodically emit items.
Sx.interval_timer(1.0).subscribe(func(): print("Tick!"))
# prints 'Tick!' every second
interval_timer()
also accepts optional parameters to control the process mode and process callback of the timer.
To see more information about what they do, check Godot docs about Node and Timer respectively.
Sx.interval_timer(
1.0,
Node.PROCESS_MODE_ALWAYS,
Timer.TIMER_PROCESS_PHYSICS
).subscribe(func(): print("Tick!"))
This Timer will automatically destroy itself once all the subscriptions are disposed.
Creation of one-shot timers this way is not supported, but you can just do:
Sx.from(get_tree().create_timer(1.0).timeout).subscribe(func(): print("Timeout!"))
# or:
Sx.interval_timer(1.0).first().subscribe(func(): print("Timeout!"))
Sx now also provides debounce()
and throttle()
operators, which either debounce or throttle the emission of signals based on provided time duration.
var text_edit: TextEdit
Sx.from(text_edit.text_changed).debounce(0.25).subscribe(func(): print(text_edit.text)) # text will be printed to console only after 0.25 seconds elapsed since last keystroke.
Sx.from(text_edit.text_changed).throttle(0.25).subscribe(func(): print(text_edit.text)) # text will be printed every 0.25 seconds when typing continuously.
Sx allows for scanning and buffering incoming values inside a stateful operator. This operator behaves similarly to reduce()
in functional programming.
signal numbers(value: int)
Sx.from(numbers).scan(
func(acc: int, value: int):
return acc + value,
0
).subscribe(func(value: int): print(value))
numbers.emit(3)
numbers.emit(2)
numbers.emit(7)
# result:
# 3
# 5
# 12
This can also be useful if you want to collect previous emissions:
signal numbers(value: int)
Sx.from(numbers).scan(
func(acc: Array[int], value: int):
acc.append(value)
return acc,
[]
).subscribe(func(value: Array[int]): print(value))
numbers.emit(3)
numbers.emit(2)
numbers.emit(7)
# result:
# [3]
# [3, 2]
# [3, 2, 7]
Due to the way the reducing function works, while multiple signal arguments will be passed to the function after the accumulator, this function should return one value.
Subsequent operator after scan
will receive ONE argument.
When you're subscribing, you can set an optional callback that will be fired when the signal completes (either naturally, or when signal is disposed).
signal numbers(int)
Sx.from(numbers).first().subscribe(
func(value: int): print(value),
0, # no connect flags
func(): print("Completed")
)
numbers.emit(5)
# result:
# 5
# Completed
Or:
signal numbers(int)
var disposable := Sx.from(numbers).subscribe(
func(value: int): print(value),
0, # no connect flags
func(): print("Completed")
)
numbers.emit(5)
disposable.dispose()
# result:
# 5
# Completed
Signals can dispose themselves (and disconnect from signals) when some of operators are used. These include:
- take_while
- take
- element_at
- first
When they finish their emissions, they dispose themselves according to the operator used.
signal numbers(value)
Sx.from(numbers).take_while(func(value: int): return value < 0) \
.subscribe(
func(value: int): print(value),
0, # no connect flags
func(): print("Completed")
)
numbers.emit(-2)
numbers.emit(-1)
numbers.emit(0)
numbers.emit(1)
numbers.emit(2)
# result:
# -2
# -1
# Completed
signal numbers(value)
Sx.from(numbers).first().subscribe(
func(value: int): print(value),
0, # no connect flags
func(): print("Completed")
)
numbers.emit(-2)
numbers.emit(-1)
numbers.emit(0)
numbers.emit(1)
numbers.emit(2)
# result:
# -2
# Completed
You might want to dispose SxSignals manually, or automatically when the subscribing Node exits the tree (both of which are highly recommended to make sure no accidental memory leaks occur).
subscribe()
method returns a SxDisposable object which allows for:
- manual subscription disposal (and subsequent disconnection from signal) using
dispose()
- automatic disposal when Node is exitting the scene tree using
dispose_with(Node)
- adding disposable to SxCompositeDisposable using
dispose_with(SxCompositeDisposable)
extends Node
func _ready() -> void:
Sx.from(some_other_node.my_signal).subscribe(func(): pass).dispose_with(self)
Composite disposable:
signal test_signal
var composite_disposable := SxCompositeDisposable.new()
Sx.from(test_signal).subscribe(func(): print("First subscription")).dispose_with(composite_disposable)
Sx.from(test_signal).subscribe(func(): print("Second subscription")).dispose_with(composite_disposable)
composite_disposable.dispose()
Disposables can also be added to a SxCompositeDisposable directly:
signal some_signal
var composite_disposable := SxCompositeDisposable.new()
var disposable := Sx.from(some_signal).subscribe(func(): pass)
composite_disposable.append(disposable)
composite_disposable.dispose()
Sometimes, you need to store some values and react when they change. For this reason, Sx provides it's own implementation of Signal-based values, much like ReactiveProperties in GodotRx and UniRx.
var property := SxProperty.new(10)
property.as_signal().subscribe(func(value: int): print(value))
property.value = 15
# result:
# 10
# 15
You can also directly access the underlying signal:
property.value_changed.connect(func(value: int): print(value))
Also, in case you don't want the initial emission when subscribing to SxProperty, you can pass false to as_signal()
:
var property := SxProperty.new(10)
property.as_signal(false).subscribe(func(value: int): print(value))
property.value = 15
# result:
# 15
In case you want to remember last emission from a signal, while keeping the ability to automatically update the value and notify subscribers,
the SxSignalProperty is for you.
This special type of SxProperty, receives a SxSignal and optional initial_value
.
It will subscribe to the passed signal and assign the payload to the internal variable whenever emission occurs.
signal number(value: int)
var property: SxProperty
property = SxSignalProperty.new(
Sx.from(number).map(
func(value: int):
return value * 2
),
0
)
print(property.value)
number.emit(2)
print(property.value)
# result:
# 0
# 4
Of course, SxSignalProperty behaves just like SxProperty, so you can subscribe to that property using as_signal()
:
signal number(value: int)
var property: SxProperty
property = SxSignalProperty.new(
Sx.from(number).map(
func(value: int):
return value * 2
),
0
)
property.as_signal().subscribe(
func(value: int):
print(value)
)
number.emit(2)
# result:
# 0
# 4
Setting the value with value
will work and will notify all subscribers, however that value will be replaced as soon as new emission occurs.
There are also wrappers around Array and Dictionary, called SxArrayProperty and SxDictionaryProperty, but they're more complex. For starters, all operations that result in count of items change, should be processed by the Sx wrapper, but everything else, like filtering, and mapping items is allowed only through special getter .value which returns the underlying Array or Dictionary.
var array := SxArrayProperty.new()
array.as_signal().subscribe(func(type: SxArrayProperty.Event, current_array: Array, payload: Variant):
print(type, current_array, payload)
)
array.append(2)
# result:
# SxArrayProperty.Event.UPDATED_LIST [] []
# SxArrayProperty.Event.UPDATED [2] 2
# SxArrayProperty.Event.COUNT_CHANGED [2] 1
var dict := SxDictionaryProperty.new()
dict.as_signal().subscribe(func(type: SxDictionaryProperty.Event, current: Dictionary, payload: Variant):
print(type, current, payload)
)
dict.set_value("test", 2)
# result:
# SxDictionaryProperty.Event.UPDATED_LIST {} {}
# SxDictionaryProperty.Event.UPDATED {"test":2} "test"
# SxDictionaryProperty.Event.COUNT_CHANGED {"test":2} 1
When getting the underlying value, use .value or .get_index()/.get_value()
var array := SxArrayProperty.new([1])
print(array.value[0])
print(array.get_index(0))
# result:
# 1
# 1
To observe specific events:
var array := SxArrayProperty.new([1])
array.observe(SxArrayProperty.Event.UPDATED).subscribe(func(current_array: Array, payload: Variant):
print(current_array, payload)
)
Both .as_signal() and .observe() can take an optional bool argument emit_initial_value which can be set to false. Doing so will not emit the current state when subscribing:
var dict := SxDictionaryProperty.new()
dict.as_signal(false).subscribe(func(type: SxDictionaryProperty.Event, current: Dictionary, payload: Variant):
print(type, current, payload)
)
dict.set_value("test", 2)
# result:
# SxDictionaryProperty.Event.UPDATED {"test":2} "test"
# SxDictionaryProperty.Event.COUNT_CHANGED {"test":2} 1
Both SxArrayProperty and SxDictionaryProperty implement custom iterators, so they can iterated on in for loops:
var array := SxArrayProperty.new([10, 20, 30])
for item in array:
print(item)
# result:
# 10
# 20
# 30
var dict := SxDictionaryProperty.new({test1 = 1, test2 = 2})
for key in dict:
print(key)
# result:
# test1
# test2
- debounce
- delay
- element_at
- filter
- first
- map
- merge
- merge_from
- scan
- skip
- skip_while
- start_with
- take
- take_while
- throttle
Please note that full implementation of all Rx operators is NOT a goal of this library. If you have a more complex problem that cannot be solved with Sx, then use GodotRx instead.
This library uses gdUnit4 as the unit test framework, but it is not provided in this repo. In order to run the tests, it needs to be installed manually (more information here: https://mikeschulze.github.io/gdUnit4/first_steps/install/). After that, tests can be run from the editor.
Distributed under the MIT License.