Skip to content

Commit

Permalink
Implement client-server input handling
Browse files Browse the repository at this point in the history
- This incurs round-trip latency rather than 1-way
- This is needed in UnrealLibretro because it will
  yield nice out-of-the-box behavior
  • Loading branch information
N7Alpha committed Jun 8, 2024
1 parent e350855 commit ffcdca3
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 33 deletions.
4 changes: 2 additions & 2 deletions Source/UnrealLibretro/Private/LibretroContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ void FLibretroContext::load(const char *sofile) {
libretro_callbacks->input_state = [this](unsigned port, unsigned device, unsigned index, unsigned id) { return core_input_state(port, device, index, id); };
libretro_callbacks->input_poll = [this]() {
memset(InputState, 0, sizeof(InputState)); // @todo To query the input for sparse packets rather than this laborious subroutine
ulnet_input_poll(netplay_session, (ulnet_input_state_t (*)[ULNET_PORT_COUNT]) &InputState); // @todo this should be done in the next procedure also make
ulnet_input_poll(netplay_session, (ulnet_input_state_t (*)[ULNET_PORT_COUNT]) &InputState); // @todo Cast violates strict-aliasing
};
libretro_callbacks->environment = [this](unsigned cmd, void* data) { return core_environment(cmd, data); };
libretro_callbacks->get_current_framebuffer = [this]() { return core.gl.framebuffer; };
Expand Down Expand Up @@ -1197,7 +1197,7 @@ FLibretroContext* FLibretroContext::Launch(ULibretroCoreInstance* LibretroCoreIn
ulnet_core_option_t option = { 0 };
auto state = ulnet_query_generate_next_input(l->netplay_session, &option);
if (state) {
memset(state, 0, sizeof(*state));
memcpy(state, l->NextInputState, sizeof(*state));
}

l->netplay_save_state_size = l->libretro_api.serialize_size();
Expand Down
1 change: 1 addition & 0 deletions Source/UnrealLibretro/Private/LibretroContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ struct FLibretroContext {
* This is what the libretro core reads from when determining input. If you want to use your own input method you can modify this directly.
*/
FLibretroInputState InputState[PortCount];
FLibretroInputState NextInputState[PortCount];

std::atomic<bool> OptionsHaveBeenModified;
TArray<std::atomic<uint8>> OptionSelectedIndex;
Expand Down
4 changes: 2 additions & 2 deletions Source/UnrealLibretro/Private/LibretroCoreInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ void ULibretroCoreInstance::SetInputDigital(int Port, bool Pressed, ERetroDevice

CoreInstance.GetValue()->EnqueueTask([=, CoreInstance = CoreInstance.GetValue()](auto)
{
CoreInstance->InputState[Port][Input] = Pressed;
CoreInstance->NextInputState[Port][Input] = Pressed;
});
}

Expand All @@ -347,7 +347,7 @@ void ULibretroCoreInstance::SetInputAnalog(int Port, int _16BitSignedInteger, ER

CoreInstance.GetValue()->EnqueueTask([=, CoreInstance = CoreInstance.GetValue()](auto)
{
CoreInstance->InputState[Port][Input] = _16BitSignedInteger;
CoreInstance->NextInputState[Port][Input] = _16BitSignedInteger;
});
}

Expand Down
98 changes: 71 additions & 27 deletions Source/UnrealLibretro/Private/ulnet.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

#define ULNET_CHANNEL_EXTRA 0x00
#define ULNET_CHANNEL_INPUT 0x10
#define ULNET_CHANNEL_INPUT_AUDIT_CONSISTENCY 0x20
#define ULNET_CHANNEL_SPECTATOR_INPUT 0x20
#define ULNET_CHANNEL_SAVESTATE_TRANSFER 0x30
#define ULNET_CHANNEL_DESYNC_DEBUG 0xF0

Expand Down Expand Up @@ -184,6 +184,7 @@ typedef struct ulnet_session {
juice_agent_t *agent [SAM2_PORT_MAX + 1 /* Plus Authority */ + ULNET_SPECTATOR_MAX];
int64_t peer_desynced_frame [SAM2_PORT_MAX + 1 /* Plus Authority */ + ULNET_SPECTATOR_MAX];
ulnet_state_t state [SAM2_PORT_MAX + 1 /* Plus Authority */];
ulnet_input_state_t spectator_suggested_input_state[SAM2_PORT_MAX + 1 /* Plus Authority */ + ULNET_SPECTATOR_MAX][ULNET_PORT_COUNT];
unsigned char state_packet_history[SAM2_PORT_MAX + 1 /* Plus Authority */][ULNET_STATE_PACKET_HISTORY_SIZE][ULNET_PACKET_SIZE_BYTES_MAX];
uint64_t peer_needs_sync_bitfield;

Expand Down Expand Up @@ -222,7 +223,9 @@ static inline int ulnet_our_port(ulnet_session_t *session) {
// @todo There is a bug here where we are sending out packets as the authority when we are not the authority
if (session->room_we_are_in.flags & SAM2_FLAG_ROOM_IS_NETWORK_HOSTED) {
int port = sam2_get_port_of_peer(&session->room_we_are_in, session->our_peer_id);
assert(port != -1);
if (port == -1) {
SAM2_LOG_FATAL("Our peer ID %016" PRIx64 " not found in room", session->our_peer_id);
}

return port;
} else {
Expand Down Expand Up @@ -322,7 +325,22 @@ ULNET_LINKAGE ulnet_input_state_t (*ulnet_query_generate_next_input(ulnet_sessio
memset(&session->next_room_xor_delta, 0, sizeof(session->next_room_xor_delta));
//}

// Incoporate input from spectators into our input. This has the drawback of round trip latency but requires a single connection to the server
memset(session->state[ulnet_our_port(session)].input_state[next_buffer_index], 0, sizeof(session->state[ulnet_our_port(session)].input_state[next_buffer_index]));
for (int i = 0; i < SAM2_ARRAY_LENGTH(session->agent); i++) {
if (session->agent[i]) {
for (int p = 0; p < SAM2_PORT_MAX; p++) {
for (int j = 0; j < SAM2_ARRAY_LENGTH(session->state[ulnet_our_port(session)].input_state[next_buffer_index][p]); j++) {
session->state[ulnet_our_port(session)].input_state[next_buffer_index][p][j] |= session->spectator_suggested_input_state[i][p][j];
}
}
}
}

return &session->state[ulnet_our_port(session)].input_state[next_buffer_index];
} else if (ulnet_is_spectator(session, session->our_peer_id)) {
memset(session->spectator_suggested_input_state[63], 0, sizeof(session->spectator_suggested_input_state[63]));
return &session->spectator_suggested_input_state[63];
}

return NULL;
Expand Down Expand Up @@ -381,24 +399,34 @@ ULNET_LINKAGE int ulnet_poll_session(ulnet_session_t *session, bool force_save_s
int status = 0;

session->retro_unserialize = retro_unserialize; // If used this is invoked through a callback within this function call
if ( !ulnet_is_spectator(session, session->our_peer_id)
&& session->room_we_are_in.flags & SAM2_FLAG_ROOM_IS_NETWORK_HOSTED) {
if (session->room_we_are_in.flags & SAM2_FLAG_ROOM_IS_NETWORK_HOSTED) {
uint8_t _[RLE8_ENCODE_UPPER_BOUND(ULNET_PACKET_SIZE_BYTES_MAX)];
ulnet_state_packet_t *input_packet = (ulnet_state_packet_t *) _;
input_packet->channel_and_port = ULNET_CHANNEL_INPUT | ulnet_our_port(session);
ulnet_state_packet_t *input_packet = (ulnet_state_packet_t *) _; // @todo Strict-aliasing

uint8_t *packet_payload;
if (ulnet_is_spectator(session, session->our_peer_id)) {
input_packet->channel_and_port = ULNET_CHANNEL_SPECTATOR_INPUT;
packet_payload = (uint8_t *) &session->spectator_suggested_input_state[63];
} else {
input_packet->channel_and_port = ULNET_CHANNEL_INPUT | ulnet_our_port(session);
packet_payload = (uint8_t *) &session->state[ulnet_our_port(session)];
}

int64_t actual_payload_size = rle8_encode(
(uint8_t *)&session->state[ulnet_our_port(session)],
packet_payload,
sizeof(session->state[0]),
input_packet->coded_state
);

void *next_history_packet = &session->state_packet_history[ulnet_our_port(session)][session->state[ulnet_our_port(session)].frame % ULNET_STATE_PACKET_HISTORY_SIZE];
memset(next_history_packet, 0, sizeof(session->state_packet_history[0][0]));
memcpy(
next_history_packet,
input_packet,
actual_payload_size
);
if ((input_packet->channel_and_port & ULNET_CHANNEL_MASK) == ULNET_CHANNEL_INPUT) {
void *next_history_packet = &session->state_packet_history[ulnet_our_port(session)][session->state[ulnet_our_port(session)].frame % ULNET_STATE_PACKET_HISTORY_SIZE];
memset(next_history_packet, 0, sizeof(session->state_packet_history[0][0]));
memcpy(
next_history_packet,
input_packet,
actual_payload_size
);
}

if (sizeof(ulnet_state_packet_t) + actual_payload_size > ULNET_PACKET_SIZE_BYTES_MAX) {
SAM2_LOG_FATAL("Input packet too large to send");
Expand All @@ -409,11 +437,14 @@ ULNET_LINKAGE int ulnet_poll_session(ulnet_session_t *session, bool force_save_s
juice_state_t state = juice_get_state(session->agent[p]);

// Wait until we can send netplay messages to everyone without fail
if ( state == JUICE_STATE_CONNECTED || state == JUICE_STATE_COMPLETED
&& !ulnet_is_spectator(session, session->our_peer_id)) {
if (state == JUICE_STATE_CONNECTED || state == JUICE_STATE_COMPLETED) {
juice_send(session->agent[p], (const char *) input_packet, sizeof(ulnet_state_packet_t) + actual_payload_size);
SAM2_LOG_DEBUG("Sent input packet for frame %" PRId64 " dest peer_ids[%d]=%" PRIx64,
session->state[SAM2_AUTHORITY_INDEX].frame, p, session->room_we_are_in.peer_ids[p]);
if ((input_packet->channel_and_port & ULNET_CHANNEL_MASK) == ULNET_CHANNEL_INPUT) {
SAM2_LOG_DEBUG("Sent input packet for frame %" PRId64 " dest peer_ids[%d]=%" PRIx64,
session->state[ulnet_our_port(session)].frame, p, session->room_we_are_in.peer_ids[p]);
} else {
SAM2_LOG_DEBUG("Sent spectator input packet dest peer_ids[%d]=%" PRIx64, p, session->room_we_are_in.peer_ids[p]);
}
}
}
}
Expand Down Expand Up @@ -466,7 +497,17 @@ ULNET_LINKAGE int ulnet_poll_session(ulnet_session_t *session, bool force_save_s
int agent_count = 0;
for (int p = 0; p < SAM2_ARRAY_LENGTH(session->agent); p++) {
if (session->agent[p]) {
agent[agent_count++] = session->agent[p];
if (juice_get_state(session->agent[p]) != JUICE_STATE_FAILED) {
agent[agent_count++] = session->agent[p];
} else {
if (p >= SAM2_PORT_MAX+1) {
SAM2_LOG_INFO("Spectator %016" PRIx64 " left" , session->room_we_are_in.peer_ids[p]);
} else {
SAM2_LOG_ERROR("Peer %016" PRIx64 " disconnected before leaving the room this should force a resync which I don't do right now @todo" , session->room_we_are_in.peer_ids[p]);
}

ulnet_disconnect_peer(session, p);
}
}
}

Expand Down Expand Up @@ -786,13 +827,7 @@ static void ulnet__on_state_changed(juice_agent_t *agent, juice_state_t state, v
SAM2_LOG_INFO("Setting peer needs sync bit for peer %016" PRIx64, session->our_peer_id);
session->peer_needs_sync_bitfield |= (1ULL << p);
} else if (state == JUICE_STATE_FAILED) {
if (p >= SAM2_PORT_MAX+1) {
SAM2_LOG_INFO("Spectator %016" PRIx64 " left" , session->room_we_are_in.peer_ids[p]);
ulnet_disconnect_peer(session, p);
} else {

}

//ulnet_disconnect_peer(session, p); // This is called from within juice_user_poll()... So freeing the agent here isn't safe. @todo There might be something I can do about this
}
}

Expand Down Expand Up @@ -852,7 +887,8 @@ static void ulnet_receive_packet_callback(juice_agent_t *agent, const char *data
return;
}

if (p >= SAM2_PORT_MAX+1) {
if ( p >= SAM2_PORT_MAX+1
&& (data[0] & ULNET_CHANNEL_MASK) != ULNET_CHANNEL_SPECTATOR_INPUT) {
SAM2_LOG_WARN("A spectator sent us a UDP packet for unsupported channel %" PRIx8 " for some reason", data[0] & ULNET_CHANNEL_MASK);
return;
}
Expand Down Expand Up @@ -927,6 +963,14 @@ static void ulnet_receive_packet_callback(juice_agent_t *agent, const char *data

break;
}
case ULNET_CHANNEL_SPECTATOR_INPUT: {
rle8_decode(
(const uint8_t *) &data[1], size - 1,
(uint8_t *) &session->spectator_suggested_input_state[p], sizeof(session->spectator_suggested_input_state[p])
);

break;
}
case ULNET_CHANNEL_DESYNC_DEBUG: {
// @todo This channel doesn't receive messages reliably, but I think it should be changed to in the same manner as the input channel
assert(size == sizeof(desync_debug_packet_t));
Expand Down
5 changes: 5 additions & 0 deletions Source/UnrealLibretro/Public/LibretroInputDefinitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ struct FLibretroInputState
int16_t& operator[](ERetroDeviceID RetroDeviceID) { return Data[to_integral(RetroDeviceID)]; }
};

static_assert(std::is_trivially_copyable<FLibretroInputState>::value, "FLibretroInputState should be trivially copyable");
static_assert(std::is_standard_layout<FLibretroInputState>::value, "FLibretroInputState should have a standard layout");
static_assert(sizeof(FLibretroInputState) == sizeof(int16_t) * std::extent<decltype(FLibretroInputState::Data)>::value, "FLibretroInputState size should match the size of the Data array");
static_assert(alignof(FLibretroInputState) == alignof(int16_t), "FLibretroInputState alignment should match the alignment of int16_t");

static_assert(static_cast<size_t>(ERetroDeviceID::Size) < sizeof(FLibretroInputState::Data) / sizeof(std::remove_reference_t<decltype(std::declval<FLibretroInputState>().Data[0])>),
"ERetroDeviceID::Size must be less than the number of elements in FLibretroInputState::Data");

Expand Down
4 changes: 2 additions & 2 deletions netarch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2782,12 +2782,12 @@ int main(int argc, char *argv[]) {
if (next_input_state) {

for (int i = 0; g_binds[i].k || g_binds[i].rk; ++i) {
next_input_state[0][g_binds[i].rk] = g_kbd[g_binds[i].k];
next_input_state[0][g_binds[i].rk] |= g_kbd[g_binds[i].k];
}

if (g_libretro_context.fuzz_input) {
for (int i = 0; i < 16; ++i) {
next_input_state[0][i] = rand() & 0x0001;
next_input_state[0][i] |= rand() & 0x0001;
}
}
}
Expand Down

0 comments on commit ffcdca3

Please sign in to comment.