diff --git a/include/mavlink_interface.h b/include/mavlink_interface.h index 6637cdc..a631998 100644 --- a/include/mavlink_interface.h +++ b/include/mavlink_interface.h @@ -163,14 +163,17 @@ class MavlinkInterface { void SetMavlinkTcpPort(int mavlink_tcp_port) {mavlink_tcp_port_ = mavlink_tcp_port;} void SetMavlinkUdpRemotePort(int mavlink_udp_port) {mavlink_udp_remote_port_ = mavlink_udp_port;} void SetMavlinkUdpLocalPort(int mavlink_udp_port) {mavlink_udp_local_port_ = mavlink_udp_port;} + void SetSecondaryMavlinkUdpLocalPort(int mavlink_udp_port) {secondary_mavlink_udp_local_port_ = mavlink_udp_port;} bool IsRecvBuffEmpty() {return receiver_buffer_.empty();} - bool ReceivedHeartbeats() const { return received_heartbeats_[0] || received_heartbeats_[1]; } + bool ReceivedHeartbeats() const { return received_heartbeats_; } private: bool received_actuator_{false}; bool received_first_actuator_{false}; - bool armed_; + bool armed1_{false}; + bool armed2_{false}; + bool use_redundant_{false}; bool messages_handled_{false}; Eigen::VectorXd input_reference_; @@ -185,12 +188,16 @@ class MavlinkInterface { void ReceiveWorker(); void SendWorker(); + void ProcessReceivedMessage(int ret, char *thrd_name); + static const unsigned n_out_max = 16; int input_index_[n_out_max]; struct sockaddr_in local_simulator_addr_; socklen_t local_simulator_addr_len_; + struct sockaddr_in secondary_local_simulator_addr_; + socklen_t secondary_local_simulator_addr_len_; struct sockaddr_in remote_simulator_addr_; socklen_t remote_simulator_addr_len_; struct sockaddr_in secondary_remote_simulator_addr_; @@ -213,10 +220,11 @@ class MavlinkInterface { std::string secondary_mavlink_addr_str_{"INADDR_ANY"}; int mavlink_udp_remote_port_{kDefaultMavlinkUdpRemotePort}; // MAVLink refers to the PX4 simulator interface here int mavlink_udp_local_port_{kDefaultMavlinkUdpLocalPort}; // MAVLink refers to the PX4 simulator interface here + int secondary_mavlink_udp_local_port_{kDefaultMavlinkUdpLocalPort+1}; int mavlink_tcp_port_{kDefaultMavlinkTcpPort}; // MAVLink refers to the PX4 simulator interface here - int simulator_socket_fd_{0}; + int simulator_second_socket_fd_{0}; int simulator_tcp_client_fd_{0}; bool enable_lockstep_{false}; @@ -251,7 +259,7 @@ class MavlinkInterface { //std::vector> hil_data_; std::atomic gotSigInt_ {false}; - bool received_heartbeats_[2] {false, false}; + bool received_heartbeats_{false}; std::mutex receiver_buff_mtx_; std::queue> receiver_buffer_; diff --git a/src/gazebo_mavlink_interface.cpp b/src/gazebo_mavlink_interface.cpp index b2ce68d..67a8107 100644 --- a/src/gazebo_mavlink_interface.cpp +++ b/src/gazebo_mavlink_interface.cpp @@ -194,6 +194,11 @@ void GazeboMavlinkInterface::Configure(const gz::sim::Entity &_entity, mavlink_interface_->SetMavlinkUdpLocalPort(mavlink_udp_local_port); } + if (_sdf->HasElement("secondary_mavlink_udp_local_port")) { + int mavlink_udp_local_port = _sdf->Get("secondary_mavlink_udp_local_port"); + mavlink_interface_->SetSecondaryMavlinkUdpLocalPort(mavlink_udp_local_port); + } + if (_sdf->HasElement("mavlink_tcp_port")) { int mavlink_tcp_port = _sdf->Get("mavlink_tcp_port"); mavlink_interface_->SetMavlinkTcpPort(mavlink_tcp_port); diff --git a/src/mavlink_interface.cpp b/src/mavlink_interface.cpp index 771aa6b..120a779 100644 --- a/src/mavlink_interface.cpp +++ b/src/mavlink_interface.cpp @@ -19,6 +19,13 @@ void MavlinkInterface::Load() abort(); } } + if (secondary_mavlink_addr_str_ != "INADDR_ANY") { + secondary_mavlink_addr_ = inet_addr(secondary_mavlink_addr_str_.c_str()); + if (secondary_mavlink_addr_ == INADDR_NONE) { + std::cerr << "Invalid secondary_mavlink_addr: " << secondary_mavlink_addr_ << ", aborting" << std::endl; + abort(); + } + } // initialize sender status to zero memset((char *)&sender_m_status_, 0, sizeof(sender_m_status_)); @@ -35,6 +42,10 @@ void MavlinkInterface::Load() local_simulator_addr_.sin_family = AF_INET; local_simulator_addr_len_ = sizeof(local_simulator_addr_); + memset((char *)&secondary_local_simulator_addr_, 0, sizeof(secondary_local_simulator_addr_)); + secondary_local_simulator_addr_.sin_family = AF_INET; + secondary_local_simulator_addr_len_ = sizeof(secondary_local_simulator_addr_); + if (use_tcp_) { if ((simulator_socket_fd_ = socket(AF_INET, SOCK_STREAM, 0)) < 0) { @@ -105,14 +116,18 @@ void MavlinkInterface::Load() } } else { // When connecting to SITL, we specify the port where the mavlink traffic originates from. + // Remote remote_simulator_addr_.sin_addr.s_addr = mavlink_addr_; remote_simulator_addr_.sin_port = htons(mavlink_udp_remote_port_); secondary_remote_simulator_addr_.sin_addr.s_addr = secondary_mavlink_addr_; secondary_remote_simulator_addr_.sin_port = htons(mavlink_udp_remote_port_); + // Local local_simulator_addr_.sin_addr.s_addr = htonl(INADDR_ANY); local_simulator_addr_.sin_port = htons(mavlink_udp_local_port_); + secondary_local_simulator_addr_.sin_addr.s_addr = htonl(INADDR_ANY); + secondary_local_simulator_addr_.sin_port = htons(secondary_mavlink_udp_local_port_); - std::cout << "Creating UDP socket for SITL input on local port : " << mavlink_udp_local_port_ << " and remote port " << mavlink_udp_remote_port_ << std::endl; + std::cout << "Creating UDP socket for HITL input on local port : " << mavlink_udp_local_port_ << " and remote port " << mavlink_udp_remote_port_ << std::endl; if ((simulator_socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { std::cerr << "Creating UDP socket failed: " << strerror(errno) << ", aborting" << std::endl; @@ -124,10 +139,21 @@ void MavlinkInterface::Load() abort(); } + std::cout << "Creating secondary UDP socket for HITL input on local port : " << secondary_mavlink_udp_local_port_ << " and remote port " << mavlink_udp_remote_port_ << std::endl; + + if ((simulator_second_socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { + std::cerr << "Creating secondary UDP socket failed: " << strerror(errno) << ", aborting" << std::endl; + abort(); + } + + if (bind(simulator_second_socket_fd_, (struct sockaddr *)&secondary_local_simulator_addr_, secondary_local_simulator_addr_len_) < 0) { + std::cerr << "secondary bind failed: " << strerror(errno) << ", aborting" << std::endl; + abort(); + } + memset(fds_, 0, sizeof(fds_)); fds_[CONNECTION_FD].fd = simulator_socket_fd_; fds_[CONNECTION_FD].events = POLLIN | POLLOUT; // read/write - } // Start mavlink message receiver thread @@ -155,6 +181,54 @@ std::shared_ptr MavlinkInterface::PopRecvMessage() { return msg; } +void MavlinkInterface::ProcessReceivedMessage(int ret, char *thrd_name) { + + if (ret < 0) { + std::cerr << "[" << thrd_name << "] recv error: " << strerror(errno) << std::endl; + if (errno == ECONNRESET) { + close_conn_ = true; + } + return; + } else if (ret == 0) { + std::cerr << "[" << thrd_name << "] no data received after select trigger.." << std::endl; + // No data.. + return; + } + + // data received + int len = ret; + mavlink_status_t status; + mavlink_message_t message; + + for(int i = 0; i < len; i++) + { + auto msg_received = static_cast(mavlink_frame_char_buffer(&m_buffer_, &m_status_, buf_[i], &message, &status)); + if (msg_received == Framing::bad_crc || msg_received == Framing::bad_signature) { + _mav_parse_error(&m_status_); + m_status_.msg_received = MAVLINK_FRAMING_INCOMPLETE; + m_status_.parse_state = MAVLINK_PARSE_STATE_IDLE; + if (buf_[i] == MAVLINK_STX) { + m_status_.parse_state = MAVLINK_PARSE_STATE_GOT_STX; + m_buffer_.len = 0; + mavlink_start_checksum(&m_buffer_); + } + } + + if (msg_received != Framing::incomplete) { + auto msg = std::make_shared(message); + const std::lock_guard guard(receiver_buff_mtx_); + if (receiver_buffer_.size() > kMaxRecvBufferSize) { + std::cerr << "[" << thrd_name << "] Messages buffer overflow!" << std::endl; + // clear the buffer + while (!receiver_buffer_.empty()) { + receiver_buffer_.pop(); + } + } + receiver_buffer_.push(msg); + } + } +} + void MavlinkInterface::ReceiveWorker() { struct sockaddr_in remote_addr; socklen_t remote_addr_len = sizeof(remote_addr); @@ -165,71 +239,55 @@ void MavlinkInterface::ReceiveWorker() { std::cout << "[" << thrd_name << "] starts" << std::endl; + /* // Wait for connection: if ((fds_[CONNECTION_FD].fd <= 0) && use_tcp_) { // Client mode std::cout << "[" << thrd_name << "] Wait for TCP connection.." << std::endl; while (fds_[CONNECTION_FD].fd <= 0) { - if (!tcp_client_mode_) acceptConnections(); - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + if (!tcp_client_mode_) acceptConnections(); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); } std::cout << "[" << thrd_name << "] TCP connection detected" << std::endl; } - + */ std::cout << "[" << thrd_name << "] Start receiving..." << std::endl; + fd_set readfds; + int maxfd = std::max(simulator_socket_fd_, simulator_second_socket_fd_) + 1; + while(!close_conn_ && !gotSigInt_) { - int ret = recvfrom(fds_[CONNECTION_FD].fd, buf_, sizeof(buf_), 0, (struct sockaddr *)&remote_addr, &remote_addr_len); + FD_ZERO(&readfds); + FD_SET(simulator_socket_fd_, &readfds); + FD_SET(simulator_second_socket_fd_, &readfds); + + struct timeval tv = {1, 0}; // 1 second timeout + int ret = select(maxfd, &readfds, nullptr, nullptr, &tv); if (ret < 0) { - std::cerr << "[" << thrd_name << "] recvfrom error: " << strerror(errno) << std::endl; + std::cerr << "[" << thrd_name << "] recv error: " << strerror(errno) << std::endl; if (errno == ECONNRESET) { close_conn_ = true; } continue; - } - - // client closed the connection orderly, only makes sense on tcp - if (use_tcp_ && ret == 0) { - std::cerr << "[" << thrd_name << "] Connection closed by client." << std::endl; - close_conn_ = true; + } else if (ret == 0) { + // Timeout continue; } - // data received - int len = ret; - mavlink_status_t status; - mavlink_message_t message; - - for(int i = 0; i < len; i++) - { - auto msg_received = static_cast(mavlink_frame_char_buffer(&m_buffer_, &m_status_, buf_[i], &message, &status)); - if (msg_received == Framing::bad_crc || msg_received == Framing::bad_signature) { - _mav_parse_error(&m_status_); - m_status_.msg_received = MAVLINK_FRAMING_INCOMPLETE; - m_status_.parse_state = MAVLINK_PARSE_STATE_IDLE; - if (buf_[i] == MAVLINK_STX) { - m_status_.parse_state = MAVLINK_PARSE_STATE_GOT_STX; - m_buffer_.len = 0; - mavlink_start_checksum(&m_buffer_); - } - } - - if (msg_received != Framing::incomplete) { - auto msg = std::make_shared(message); - const std::lock_guard guard(receiver_buff_mtx_); - if (receiver_buffer_.size() > kMaxRecvBufferSize) { - std::cerr << "[" << thrd_name << "] Messages buffer overflow!" << std::endl; - // clear the buffer - while (!receiver_buffer_.empty()) { - receiver_buffer_.pop(); - } - } - receiver_buffer_.push(msg); - } + if (FD_ISSET(simulator_socket_fd_, &readfds)) { + // Receive data from sock1 + int ret = recvfrom(simulator_socket_fd_, buf_, sizeof(buf_), 0, (struct sockaddr *)&remote_addr, &remote_addr_len); + // ... process data + ProcessReceivedMessage(ret, thrd_name); + } + if (FD_ISSET(simulator_second_socket_fd_, &readfds)) { + // Receive data from sock2 + int ret = recvfrom(simulator_second_socket_fd_, buf_, sizeof(buf_), 0, (struct sockaddr *)&remote_addr, &remote_addr_len); + // ... process data + ProcessReceivedMessage(ret, thrd_name); } } std::cout << "The thread [" << thrd_name << "] was shutdown." << std::endl; - } /******************************************************* @@ -476,43 +534,122 @@ void MavlinkInterface::handle_message(mavlink_message_t *msg) void MavlinkInterface::handle_heartbeat(mavlink_message_t *msg) { + /* + char thrd_name[64] = {0}; + sprintf(thrd_name, "thr_%d", gettid()); + pthread_setname_np(pthread_self(), thrd_name); + + mavlink_heartbeat_t hbeat; + mavlink_msg_heartbeat_decode(msg, &hbeat); + //bool armed = (hbeat.system_status == MAV_STATE_ACTIVE || hbeat.system_status == MAV_STATE_CRITICAL); + int status = (int) hbeat.system_status; + int compid = (int) msg->compid; + + std::cout << "HB from " << compid << " armed:" << (armed ? "ARMED" : "DISARMED") << " status:" << status << + " *armed1_:" << static_cast(&armed1_) << " *armed2_:" << static_cast(&armed2_) << std::endl; + if (msg->compid == 1) { - received_heartbeats_[0] = true; + // Update from FC1 + // armed if state is either ACTIVE or CRITICAL, otherwise disarmed + if (armed1_ != armed) { + armed1_ = armed; + std::cout << thrd_name << " FC1: " << (armed ? "ARMED" : "DISARMED") << "(" << status << ")" + << " => STATE [" << (armed1_ ? "ARMED" : "DISARMED") + << ", " << (armed2_ ? "ARMED" : "DISARMED") << "]" << std::endl; + } + + // If FC1 is arming, make sure we use FC1 + if (armed) { + if (use_redundant_) { + std::cout << "Primary FC1 healthy and arming => Use primary FC1!" << std::endl; + use_redundant_ = false; + } + } } else if (msg->compid == 2) { - received_heartbeats_[1] = true; + // Update from FC2 + // armed if state is either ACTIVE or CRITICAL, otherwise disarmed + if (armed2_ != armed) { + armed2_ = armed; + std::cout << thrd_name << " FC2: " << (armed ? "ARMED" : "DISARMED") << "(" << status << ")" + << " => STATE [" << (armed1_ ? "ARMED" : "DISARMED") + << ", " << (armed2_ ? "ARMED" : "DISARMED") << "]" << std::endl; + } + } else { + std::cout << "HB from unknown source" << std::endl; + return; + } + + received_heartbeats_ = true; + + if (!armed1_ && armed2_) { + use_redundant_ = true; + std::cout << "Primary FC1 disarmed while redundant still armed => Use redundant FC2!" << std::endl; + } + + */ + received_heartbeats_ = true; +} + +bool is_running(int fc, mavlink_hil_actuator_controls_t &controls) { + for (int i=0; i<4; i++) { + std::cout << "FC" << fc << ": motor[" << i << "]: " << controls.controls[i] << std::endl; + if (controls.controls[i] > 0.001) { + return true; + } } + return false; } void MavlinkInterface::handle_actuator_controls(mavlink_message_t *msg) { static int consecutive_spare_msg = 0; const std::lock_guard lock(actuator_mutex_); + mavlink_hil_actuator_controls_t controls; + mavlink_msg_hil_actuator_controls_decode(msg, &controls); if (msg->compid == 2) { + // Message from redundant FC + armed2_ = is_running(2, controls); + //armed2_ = (controls.mode & MAV_MODE_FLAG_SAFETY_ARMED) != 0; + + //std::cout << "secondary FP2: " << (armed2_ ? "ARMED" : "DISARMED") << std::endl; if (consecutive_spare_msg < MAX_CONSECUTIVE_SPARE_MSG) { - if (consecutive_spare_msg > 4) { - std::cout << "spare " << consecutive_spare_msg << std::endl; + if (consecutive_spare_msg > (MAX_CONSECUTIVE_SPARE_MSG/2)) { + std::cout << "FC1 missing count " << consecutive_spare_msg << std::endl; } consecutive_spare_msg++; if (consecutive_spare_msg == MAX_CONSECUTIVE_SPARE_MSG) { - std::cout << "Primary controller not updating actuators => switch to backup controller" << std::endl; - } else { - return; + use_redundant_ = true; + std::cout << "Primary FC1 timeout => Use redundant FC2" << std::endl; } } - } else if (msg->compid == 1) { - if (consecutive_spare_msg == MAX_CONSECUTIVE_SPARE_MSG) { - // Already switched to backup controller, ignore primary controller messages + + if (!use_redundant_) { return; - } else { - consecutive_spare_msg = 0; } - } - mavlink_hil_actuator_controls_t controls; - mavlink_msg_hil_actuator_controls_decode(msg, &controls); + } else if (msg->compid == 1) { + // Message from primary FC + armed1_ = is_running(1, controls); + //armed1_ = (controls.mode & MAV_MODE_FLAG_SAFETY_ARMED) != 0; + //std::cout << "Primary FC1: " << (armed1_ ? "ARMED" : "DISARMED") << std::endl; + if (armed1_) { + if (use_redundant_) { + use_redundant_ = false; + std::cout << "Primary FC1 armed => Use primary FC1" << std::endl; + } + } else { + if (!use_redundant_) { + std::cout << "Primary FC1 disarmed => Use redundant FC2" << std::endl; + } + use_redundant_ = true; + } + consecutive_spare_msg = 0; - armed_ = (controls.mode & MAV_MODE_FLAG_SAFETY_ARMED); + if (use_redundant_) { + return; + } + } for (unsigned i = 0; i < n_out_max; i++) { input_index_[i] = i; @@ -544,8 +681,8 @@ void MavlinkInterface::send_mavlink_message(const mavlink_message_t *message) len = send(fds_[CONNECTION_FD].fd, buffer, packetlen, 0); } else { ssize_t len2; - len = sendto(fds_[CONNECTION_FD].fd, buffer, packetlen, 0, (struct sockaddr *)&remote_simulator_addr_, remote_simulator_addr_len_); - len2 = sendto(fds_[CONNECTION_FD].fd, buffer, packetlen, 0, (struct sockaddr *)&secondary_remote_simulator_addr_, secondary_remote_simulator_addr_len_); + len = sendto(simulator_socket_fd_, buffer, packetlen, 0, (struct sockaddr *)&remote_simulator_addr_, remote_simulator_addr_len_); + len2 = sendto(simulator_second_socket_fd_, buffer, packetlen, 0, (struct sockaddr *)&secondary_remote_simulator_addr_, secondary_remote_simulator_addr_len_); if (len < 0 && len2 < 0) { // neither one worked => error len = -1; @@ -603,7 +740,7 @@ Eigen::VectorXd MavlinkInterface::GetActuatorControls() { bool MavlinkInterface::GetArmedState() { const std::lock_guard lock(actuator_mutex_); - return armed_; + return (armed1_ || armed2_); } // Mavlink helper function to finalize message without global channel status