From ffcdca3cdc0b537c69c994623bb086b328ba7edb Mon Sep 17 00:00:00 2001 From: John Rehbein Date: Fri, 7 Jun 2024 20:24:59 -0700 Subject: [PATCH] Implement client-server input handling - This incurs round-trip latency rather than 1-way - This is needed in UnrealLibretro because it will yield nice out-of-the-box behavior --- .../Private/LibretroContext.cpp | 4 +- .../UnrealLibretro/Private/LibretroContext.h | 1 + .../Private/LibretroCoreInstance.cpp | 4 +- Source/UnrealLibretro/Private/ulnet.h | 98 ++++++++++++++----- .../Public/LibretroInputDefinitions.h | 5 + netarch.cpp | 4 +- 6 files changed, 83 insertions(+), 33 deletions(-) diff --git a/Source/UnrealLibretro/Private/LibretroContext.cpp b/Source/UnrealLibretro/Private/LibretroContext.cpp index 428ba3a..2616dbc 100644 --- a/Source/UnrealLibretro/Private/LibretroContext.cpp +++ b/Source/UnrealLibretro/Private/LibretroContext.cpp @@ -987,7 +987,7 @@ void FLibretroContext::load(const char *sofile) { libretro_callbacks->input_state = [this](unsigned port, unsigned device, unsigned index, unsigned id) { return core_input_state(port, device, index, id); }; libretro_callbacks->input_poll = [this]() { memset(InputState, 0, sizeof(InputState)); // @todo To query the input for sparse packets rather than this laborious subroutine - ulnet_input_poll(netplay_session, (ulnet_input_state_t (*)[ULNET_PORT_COUNT]) &InputState); // @todo this should be done in the next procedure also make + ulnet_input_poll(netplay_session, (ulnet_input_state_t (*)[ULNET_PORT_COUNT]) &InputState); // @todo Cast violates strict-aliasing }; libretro_callbacks->environment = [this](unsigned cmd, void* data) { return core_environment(cmd, data); }; libretro_callbacks->get_current_framebuffer = [this]() { return core.gl.framebuffer; }; @@ -1197,7 +1197,7 @@ FLibretroContext* FLibretroContext::Launch(ULibretroCoreInstance* LibretroCoreIn ulnet_core_option_t option = { 0 }; auto state = ulnet_query_generate_next_input(l->netplay_session, &option); if (state) { - memset(state, 0, sizeof(*state)); + memcpy(state, l->NextInputState, sizeof(*state)); } l->netplay_save_state_size = l->libretro_api.serialize_size(); diff --git a/Source/UnrealLibretro/Private/LibretroContext.h b/Source/UnrealLibretro/Private/LibretroContext.h index f02c12a..1d78429 100644 --- a/Source/UnrealLibretro/Private/LibretroContext.h +++ b/Source/UnrealLibretro/Private/LibretroContext.h @@ -127,6 +127,7 @@ struct FLibretroContext { * This is what the libretro core reads from when determining input. If you want to use your own input method you can modify this directly. */ FLibretroInputState InputState[PortCount]; + FLibretroInputState NextInputState[PortCount]; std::atomic OptionsHaveBeenModified; TArray> OptionSelectedIndex; diff --git a/Source/UnrealLibretro/Private/LibretroCoreInstance.cpp b/Source/UnrealLibretro/Private/LibretroCoreInstance.cpp index ea41783..daa4d59 100644 --- a/Source/UnrealLibretro/Private/LibretroCoreInstance.cpp +++ b/Source/UnrealLibretro/Private/LibretroCoreInstance.cpp @@ -337,7 +337,7 @@ void ULibretroCoreInstance::SetInputDigital(int Port, bool Pressed, ERetroDevice CoreInstance.GetValue()->EnqueueTask([=, CoreInstance = CoreInstance.GetValue()](auto) { - CoreInstance->InputState[Port][Input] = Pressed; + CoreInstance->NextInputState[Port][Input] = Pressed; }); } @@ -347,7 +347,7 @@ void ULibretroCoreInstance::SetInputAnalog(int Port, int _16BitSignedInteger, ER CoreInstance.GetValue()->EnqueueTask([=, CoreInstance = CoreInstance.GetValue()](auto) { - CoreInstance->InputState[Port][Input] = _16BitSignedInteger; + CoreInstance->NextInputState[Port][Input] = _16BitSignedInteger; }); } diff --git a/Source/UnrealLibretro/Private/ulnet.h b/Source/UnrealLibretro/Private/ulnet.h index a186731..8c82420 100644 --- a/Source/UnrealLibretro/Private/ulnet.h +++ b/Source/UnrealLibretro/Private/ulnet.h @@ -31,7 +31,7 @@ #define ULNET_CHANNEL_EXTRA 0x00 #define ULNET_CHANNEL_INPUT 0x10 -#define ULNET_CHANNEL_INPUT_AUDIT_CONSISTENCY 0x20 +#define ULNET_CHANNEL_SPECTATOR_INPUT 0x20 #define ULNET_CHANNEL_SAVESTATE_TRANSFER 0x30 #define ULNET_CHANNEL_DESYNC_DEBUG 0xF0 @@ -184,6 +184,7 @@ typedef struct ulnet_session { juice_agent_t *agent [SAM2_PORT_MAX + 1 /* Plus Authority */ + ULNET_SPECTATOR_MAX]; int64_t peer_desynced_frame [SAM2_PORT_MAX + 1 /* Plus Authority */ + ULNET_SPECTATOR_MAX]; ulnet_state_t state [SAM2_PORT_MAX + 1 /* Plus Authority */]; + ulnet_input_state_t spectator_suggested_input_state[SAM2_PORT_MAX + 1 /* Plus Authority */ + ULNET_SPECTATOR_MAX][ULNET_PORT_COUNT]; unsigned char state_packet_history[SAM2_PORT_MAX + 1 /* Plus Authority */][ULNET_STATE_PACKET_HISTORY_SIZE][ULNET_PACKET_SIZE_BYTES_MAX]; uint64_t peer_needs_sync_bitfield; @@ -222,7 +223,9 @@ static inline int ulnet_our_port(ulnet_session_t *session) { // @todo There is a bug here where we are sending out packets as the authority when we are not the authority if (session->room_we_are_in.flags & SAM2_FLAG_ROOM_IS_NETWORK_HOSTED) { int port = sam2_get_port_of_peer(&session->room_we_are_in, session->our_peer_id); - assert(port != -1); + if (port == -1) { + SAM2_LOG_FATAL("Our peer ID %016" PRIx64 " not found in room", session->our_peer_id); + } return port; } else { @@ -322,7 +325,22 @@ ULNET_LINKAGE ulnet_input_state_t (*ulnet_query_generate_next_input(ulnet_sessio memset(&session->next_room_xor_delta, 0, sizeof(session->next_room_xor_delta)); //} + // Incoporate input from spectators into our input. This has the drawback of round trip latency but requires a single connection to the server + memset(session->state[ulnet_our_port(session)].input_state[next_buffer_index], 0, sizeof(session->state[ulnet_our_port(session)].input_state[next_buffer_index])); + for (int i = 0; i < SAM2_ARRAY_LENGTH(session->agent); i++) { + if (session->agent[i]) { + for (int p = 0; p < SAM2_PORT_MAX; p++) { + for (int j = 0; j < SAM2_ARRAY_LENGTH(session->state[ulnet_our_port(session)].input_state[next_buffer_index][p]); j++) { + session->state[ulnet_our_port(session)].input_state[next_buffer_index][p][j] |= session->spectator_suggested_input_state[i][p][j]; + } + } + } + } + return &session->state[ulnet_our_port(session)].input_state[next_buffer_index]; + } else if (ulnet_is_spectator(session, session->our_peer_id)) { + memset(session->spectator_suggested_input_state[63], 0, sizeof(session->spectator_suggested_input_state[63])); + return &session->spectator_suggested_input_state[63]; } return NULL; @@ -381,24 +399,34 @@ ULNET_LINKAGE int ulnet_poll_session(ulnet_session_t *session, bool force_save_s int status = 0; session->retro_unserialize = retro_unserialize; // If used this is invoked through a callback within this function call - if ( !ulnet_is_spectator(session, session->our_peer_id) - && session->room_we_are_in.flags & SAM2_FLAG_ROOM_IS_NETWORK_HOSTED) { + if (session->room_we_are_in.flags & SAM2_FLAG_ROOM_IS_NETWORK_HOSTED) { uint8_t _[RLE8_ENCODE_UPPER_BOUND(ULNET_PACKET_SIZE_BYTES_MAX)]; - ulnet_state_packet_t *input_packet = (ulnet_state_packet_t *) _; - input_packet->channel_and_port = ULNET_CHANNEL_INPUT | ulnet_our_port(session); + ulnet_state_packet_t *input_packet = (ulnet_state_packet_t *) _; // @todo Strict-aliasing + + uint8_t *packet_payload; + if (ulnet_is_spectator(session, session->our_peer_id)) { + input_packet->channel_and_port = ULNET_CHANNEL_SPECTATOR_INPUT; + packet_payload = (uint8_t *) &session->spectator_suggested_input_state[63]; + } else { + input_packet->channel_and_port = ULNET_CHANNEL_INPUT | ulnet_our_port(session); + packet_payload = (uint8_t *) &session->state[ulnet_our_port(session)]; + } + int64_t actual_payload_size = rle8_encode( - (uint8_t *)&session->state[ulnet_our_port(session)], + packet_payload, sizeof(session->state[0]), input_packet->coded_state ); - void *next_history_packet = &session->state_packet_history[ulnet_our_port(session)][session->state[ulnet_our_port(session)].frame % ULNET_STATE_PACKET_HISTORY_SIZE]; - memset(next_history_packet, 0, sizeof(session->state_packet_history[0][0])); - memcpy( - next_history_packet, - input_packet, - actual_payload_size - ); + if ((input_packet->channel_and_port & ULNET_CHANNEL_MASK) == ULNET_CHANNEL_INPUT) { + void *next_history_packet = &session->state_packet_history[ulnet_our_port(session)][session->state[ulnet_our_port(session)].frame % ULNET_STATE_PACKET_HISTORY_SIZE]; + memset(next_history_packet, 0, sizeof(session->state_packet_history[0][0])); + memcpy( + next_history_packet, + input_packet, + actual_payload_size + ); + } if (sizeof(ulnet_state_packet_t) + actual_payload_size > ULNET_PACKET_SIZE_BYTES_MAX) { SAM2_LOG_FATAL("Input packet too large to send"); @@ -409,11 +437,14 @@ ULNET_LINKAGE int ulnet_poll_session(ulnet_session_t *session, bool force_save_s juice_state_t state = juice_get_state(session->agent[p]); // Wait until we can send netplay messages to everyone without fail - if ( state == JUICE_STATE_CONNECTED || state == JUICE_STATE_COMPLETED - && !ulnet_is_spectator(session, session->our_peer_id)) { + if (state == JUICE_STATE_CONNECTED || state == JUICE_STATE_COMPLETED) { juice_send(session->agent[p], (const char *) input_packet, sizeof(ulnet_state_packet_t) + actual_payload_size); - SAM2_LOG_DEBUG("Sent input packet for frame %" PRId64 " dest peer_ids[%d]=%" PRIx64, - session->state[SAM2_AUTHORITY_INDEX].frame, p, session->room_we_are_in.peer_ids[p]); + if ((input_packet->channel_and_port & ULNET_CHANNEL_MASK) == ULNET_CHANNEL_INPUT) { + SAM2_LOG_DEBUG("Sent input packet for frame %" PRId64 " dest peer_ids[%d]=%" PRIx64, + session->state[ulnet_our_port(session)].frame, p, session->room_we_are_in.peer_ids[p]); + } else { + SAM2_LOG_DEBUG("Sent spectator input packet dest peer_ids[%d]=%" PRIx64, p, session->room_we_are_in.peer_ids[p]); + } } } } @@ -466,7 +497,17 @@ ULNET_LINKAGE int ulnet_poll_session(ulnet_session_t *session, bool force_save_s int agent_count = 0; for (int p = 0; p < SAM2_ARRAY_LENGTH(session->agent); p++) { if (session->agent[p]) { - agent[agent_count++] = session->agent[p]; + if (juice_get_state(session->agent[p]) != JUICE_STATE_FAILED) { + agent[agent_count++] = session->agent[p]; + } else { + if (p >= SAM2_PORT_MAX+1) { + SAM2_LOG_INFO("Spectator %016" PRIx64 " left" , session->room_we_are_in.peer_ids[p]); + } else { + SAM2_LOG_ERROR("Peer %016" PRIx64 " disconnected before leaving the room this should force a resync which I don't do right now @todo" , session->room_we_are_in.peer_ids[p]); + } + + ulnet_disconnect_peer(session, p); + } } } @@ -786,13 +827,7 @@ static void ulnet__on_state_changed(juice_agent_t *agent, juice_state_t state, v SAM2_LOG_INFO("Setting peer needs sync bit for peer %016" PRIx64, session->our_peer_id); session->peer_needs_sync_bitfield |= (1ULL << p); } else if (state == JUICE_STATE_FAILED) { - if (p >= SAM2_PORT_MAX+1) { - SAM2_LOG_INFO("Spectator %016" PRIx64 " left" , session->room_we_are_in.peer_ids[p]); - ulnet_disconnect_peer(session, p); - } else { - - } - + //ulnet_disconnect_peer(session, p); // This is called from within juice_user_poll()... So freeing the agent here isn't safe. @todo There might be something I can do about this } } @@ -852,7 +887,8 @@ static void ulnet_receive_packet_callback(juice_agent_t *agent, const char *data return; } - if (p >= SAM2_PORT_MAX+1) { + if ( p >= SAM2_PORT_MAX+1 + && (data[0] & ULNET_CHANNEL_MASK) != ULNET_CHANNEL_SPECTATOR_INPUT) { SAM2_LOG_WARN("A spectator sent us a UDP packet for unsupported channel %" PRIx8 " for some reason", data[0] & ULNET_CHANNEL_MASK); return; } @@ -927,6 +963,14 @@ static void ulnet_receive_packet_callback(juice_agent_t *agent, const char *data break; } + case ULNET_CHANNEL_SPECTATOR_INPUT: { + rle8_decode( + (const uint8_t *) &data[1], size - 1, + (uint8_t *) &session->spectator_suggested_input_state[p], sizeof(session->spectator_suggested_input_state[p]) + ); + + break; + } case ULNET_CHANNEL_DESYNC_DEBUG: { // @todo This channel doesn't receive messages reliably, but I think it should be changed to in the same manner as the input channel assert(size == sizeof(desync_debug_packet_t)); diff --git a/Source/UnrealLibretro/Public/LibretroInputDefinitions.h b/Source/UnrealLibretro/Public/LibretroInputDefinitions.h index da764e8..2d8ab31 100644 --- a/Source/UnrealLibretro/Public/LibretroInputDefinitions.h +++ b/Source/UnrealLibretro/Public/LibretroInputDefinitions.h @@ -135,6 +135,11 @@ struct FLibretroInputState int16_t& operator[](ERetroDeviceID RetroDeviceID) { return Data[to_integral(RetroDeviceID)]; } }; +static_assert(std::is_trivially_copyable::value, "FLibretroInputState should be trivially copyable"); +static_assert(std::is_standard_layout::value, "FLibretroInputState should have a standard layout"); +static_assert(sizeof(FLibretroInputState) == sizeof(int16_t) * std::extent::value, "FLibretroInputState size should match the size of the Data array"); +static_assert(alignof(FLibretroInputState) == alignof(int16_t), "FLibretroInputState alignment should match the alignment of int16_t"); + static_assert(static_cast(ERetroDeviceID::Size) < sizeof(FLibretroInputState::Data) / sizeof(std::remove_reference_t().Data[0])>), "ERetroDeviceID::Size must be less than the number of elements in FLibretroInputState::Data"); diff --git a/netarch.cpp b/netarch.cpp index 84a2fd7..f403bb3 100644 --- a/netarch.cpp +++ b/netarch.cpp @@ -2782,12 +2782,12 @@ int main(int argc, char *argv[]) { if (next_input_state) { for (int i = 0; g_binds[i].k || g_binds[i].rk; ++i) { - next_input_state[0][g_binds[i].rk] = g_kbd[g_binds[i].k]; + next_input_state[0][g_binds[i].rk] |= g_kbd[g_binds[i].k]; } if (g_libretro_context.fuzz_input) { for (int i = 0; i < 16; ++i) { - next_input_state[0][i] = rand() & 0x0001; + next_input_state[0][i] |= rand() & 0x0001; } } }