Skip to content

Pure rust async MQTTv5 client. Fork for using it with an ESP32

License

Notifications You must be signed in to change notification settings

MCloudTT/mqrstt

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

📟 mqrstt

Crates.io Docs dependency status codecov

mqrstt is an MQTTv5 client implementation that allows for the smol and tokio runtimes. In the future we will also support a sync implementation.

Examples

You want to reconnect (with a new stream) after the network encountered an error or a disconnect took place!

Smol example:

use mqrstt::{
    AsyncClient,
    ConnectOptions,
    new_smol,
    packets::{self, Packet},
    AsyncEventHandlerMut, HandlerStatus, NetworkStatus,
};
use async_trait::async_trait;
use bytes::Bytes;
pub struct PingPong {
    pub client: AsyncClient,
}
#[async_trait]
impl AsyncEventHandlerMut for PingPong {
    // Handlers only get INCOMING packets. This can change later.
    async fn handle(&mut self, event: &packets::Packet) -> () {
        match event {
            Packet::Publish(p) => {
                if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
                    if payload.to_lowercase().contains("ping") {
                        self.client
                            .publish(
                                p.qos,
                                p.retain,
                                p.topic.clone(),
                                Bytes::from_static(b"pong"),
                            )
                            .await
                            .unwrap();
                        println!("Received Ping, Send pong!");
                    }
                }
            },
            Packet::ConnAck(_) => { println!("Connected!") },
            _ => (),
        }
    }
}
smol::block_on(async {
    let options = ConnectOptions::new("mqrsttExample".to_string());
    let (mut network, mut handler, client) = new_smol(options);
    let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883))
        .await
        .unwrap();
    network.connect(stream).await.unwrap();
    client.subscribe("mqrstt").await.unwrap();
    let mut pingpong = PingPong {
        client: client.clone(),
    };
    let (n, h, t) = futures::join!(
        async {
            loop {
                return match network.run().await {
                    Ok(NetworkStatus::Active) => continue,
                    otherwise => otherwise,
                };
            }
        },
        async {
            loop {
                return match handler.handle_mut(&mut pingpong).await {
                    Ok(HandlerStatus::Active) => continue,
                    otherwise => otherwise,
                };
            }
        },
        async {
            smol::Timer::after(std::time::Duration::from_secs(60)).await;
            client.disconnect().await.unwrap();
        }
    );
    assert!(n.is_ok());
    assert!(h.is_ok());
});

Tokio example:

let options = ConnectOptions::new("TokioTcpPingPong".to_string());

let (mut network, mut handler, client) = new_tokio(options);

let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883))
  .await
  .unwrap();

network.connect(stream).await.unwrap();

client.subscribe("mqrstt").await.unwrap();

let mut pingpong = PingPong {
  client: client.clone(),
};

let (n, h, _) = tokio::join!(
  async {
    loop {
      return match network.run().await {
        Ok(NetworkStatus::Active) => continue,
        otherwise => otherwise,
      };
    }
  },
  async {
    loop {
      return match handler.handle_mut(&mut pingpong).await {
        Ok(HandlerStatus::Active) => continue,
        otherwise => otherwise,
      };
    }
  },
  async {
    tokio::time::sleep(Duration::from_secs(60)).await;
    client.disconnect().await.unwrap();
  }
);

Important notes:

  • Handlers only get incoming packets.

Size

With the smol runtime you can create very small binaries. A simple PingPong smol TCP client can be had for 550~KB and with TLS you are looking at 1.5~ MB using the following flags. This makes mqrstt extremely usefull for embedded devices! :)

[profile.release]
opt-level = "z"  # Optimize for size.
lto = true
codegen-units = 1
strip = true

License

Licensed under

  • Mozilla Public License, Version 2.0, (MPL-2.0)

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, shall be licensed under MPL-2.0, without any additional terms or conditions.

About

Pure rust async MQTTv5 client. Fork for using it with an ESP32

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Rust 100.0%