Skip to content

Commit

Permalink
Merge pull request #152 from quartiq/rs/fnmut
Browse files Browse the repository at this point in the history
Elevating ToPayload to FnOnce, adding ToPayload for &str
  • Loading branch information
ryan-summers authored Sep 13, 2023
2 parents 0932655 + 2158721 commit cec9ce7
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 23 deletions.
9 changes: 6 additions & 3 deletions src/mqtt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,16 @@ impl<'buf, TcpStack: TcpClientStack, Clock: embedded_time::Clock, Broker: crate:
.replace(self.sm.context_mut().session_state.get_packet_identifier());
}

let packet = self.network.send_pub(&publish)?;
let id = publish.packet_id;
let qos = publish.qos;

if let Some(id) = publish.packet_id {
let packet = self.network.send_pub(publish)?;

if let Some(id) = id {
let context = self.sm.context_mut();
context
.session_state
.handle_publish(publish.qos, id, packet)
.handle_publish(qos, id, packet)
.map_err(|e| Error::Minimq(e.into()))?;
context.send_quota = context.send_quota.checked_sub(1).unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion src/network_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ where

pub fn send_pub<P: crate::publication::ToPayload>(
&mut self,
pub_packet: &Pub<'_, P>,
pub_packet: Pub<'_, P>,
) -> Result<&[u8], crate::PubError<TcpStack::Error, P::Error>> {
// If there's an unfinished write pending, it's invalid to try to write new data. The
// previous write must first be completed.
Expand Down
8 changes: 4 additions & 4 deletions src/packets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ mod tests {
};

let mut buffer: [u8; 900] = [0; 900];
let message = MqttSerializer::pub_to_buffer(&mut buffer, &publish).unwrap();
let message = MqttSerializer::pub_to_buffer(&mut buffer, publish).unwrap();

assert_eq!(message, good_publish);
}
Expand All @@ -311,7 +311,7 @@ mod tests {
};

let mut buffer: [u8; 900] = [0; 900];
let message = MqttSerializer::pub_to_buffer(&mut buffer, &publish).unwrap();
let message = MqttSerializer::pub_to_buffer(&mut buffer, publish).unwrap();

assert_eq!(message, good_publish);
}
Expand Down Expand Up @@ -363,7 +363,7 @@ mod tests {
};

let mut buffer: [u8; 900] = [0; 900];
let message = MqttSerializer::pub_to_buffer(&mut buffer, &publish).unwrap();
let message = MqttSerializer::pub_to_buffer(&mut buffer, publish).unwrap();

assert_eq!(message, good_publish);
}
Expand Down Expand Up @@ -395,7 +395,7 @@ mod tests {
};

let mut buffer: [u8; 900] = [0; 900];
let message = MqttSerializer::pub_to_buffer(&mut buffer, &publish).unwrap();
let message = MqttSerializer::pub_to_buffer(&mut buffer, publish).unwrap();

assert_eq!(message, good_publish);
}
Expand Down
27 changes: 17 additions & 10 deletions src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use crate::{

pub trait ToPayload {
type Error;
fn serialize(&self, buffer: &mut [u8]) -> Result<usize, Self::Error>;
fn serialize(self, buffer: &mut [u8]) -> Result<usize, Self::Error>;
}

impl<'a> ToPayload for &'a [u8] {
type Error = ();

fn serialize(&self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
fn serialize(self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
if buffer.len() < self.len() {
return Err(());
}
Expand All @@ -22,34 +22,41 @@ impl<'a> ToPayload for &'a [u8] {
}
}

impl<const N: usize> ToPayload for [u8; N] {
impl<'a> ToPayload for &'a str {
type Error = ();

fn serialize(&self, buffer: &mut [u8]) -> Result<usize, ()> {
(&self[..]).serialize(buffer)
fn serialize(self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
self.as_bytes().serialize(buffer)
}
}

impl<const N: usize> ToPayload for &[u8; N] {
type Error = ();

fn serialize(&self, buffer: &mut [u8]) -> Result<usize, ()> {
fn serialize(self, buffer: &mut [u8]) -> Result<usize, ()> {
(&self[..]).serialize(buffer)
}
}

pub struct DeferredPublication<E, F: Fn(&mut [u8]) -> Result<usize, E>> {
/// A publication where the payload is serialized directly into the transmission buffer in the
/// future.
///
/// # Note
/// This is "deferred" because the closure will only be called once the publication is actually
/// sent.
pub struct DeferredPublication<E, F: FnOnce(&mut [u8]) -> Result<usize, E>> {
func: F,
}

impl<E, F: Fn(&mut [u8]) -> Result<usize, E>> DeferredPublication<E, F> {
impl<E, F: FnOnce(&mut [u8]) -> Result<usize, E>> DeferredPublication<E, F> {
pub fn new<'a>(func: F) -> Publication<'a, Self> {
Publication::new(Self { func })
}
}

impl<E, F: Fn(&mut [u8]) -> Result<usize, E>> ToPayload for DeferredPublication<E, F> {
impl<E, F: FnOnce(&mut [u8]) -> Result<usize, E>> ToPayload for DeferredPublication<E, F> {
type Error = E;
fn serialize(&self, buffer: &mut [u8]) -> Result<usize, E> {
fn serialize(self, buffer: &mut [u8]) -> Result<usize, E> {
(self.func)(buffer)
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/ser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,15 @@ impl<'a> MqttSerializer<'a> {

pub fn pub_to_buffer_meta<P: crate::publication::ToPayload>(
buf: &'a mut [u8],
pub_packet: &Pub<'a, P>,
pub_packet: Pub<'_, P>,
) -> Result<(usize, &'a [u8]), PubError<P::Error>> {
let mut serializer = crate::ser::MqttSerializer::new(buf);
pub_packet
.serialize(&mut serializer)
.map_err(PubError::Error)?;

// Next, serialize the payload into the remaining buffer
let flags = pub_packet.fixed_header_flags();
let len = pub_packet
.payload
.serialize(serializer.remainder())
Expand All @@ -149,14 +150,14 @@ impl<'a> MqttSerializer<'a> {

// Finally, finish the packet and send it.
let (offset, packet) = serializer
.finalize(MessageType::Publish, pub_packet.fixed_header_flags())
.finalize(MessageType::Publish, flags)
.map_err(PubError::Error)?;
Ok((offset, packet))
}

pub fn pub_to_buffer<P: crate::publication::ToPayload>(
buf: &'a mut [u8],
pub_packet: &Pub<'a, P>,
pub_packet: Pub<'_, P>,
) -> Result<&'a [u8], PubError<P::Error>> {
let (_, packet) = Self::pub_to_buffer_meta(buf, pub_packet)?;
Ok(packet)
Expand Down
2 changes: 1 addition & 1 deletion tests/at_least_once_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn main() -> std::io::Result<()> {
if !published && mqtt.client().can_publish(QoS::AtLeastOnce) {
mqtt.client()
.publish(
Publication::new("Ping".as_bytes())
Publication::new("Ping")
.topic("data")
.qos(QoS::AtLeastOnce)
.finish()
Expand Down
2 changes: 1 addition & 1 deletion tests/exactly_once_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn main() -> std::io::Result<()> {
if !published && mqtt.client().can_publish(QoS::ExactlyOnce) {
mqtt.client()
.publish(
Publication::new("Ping".as_bytes())
Publication::new("Ping")
.topic("data")
.qos(QoS::ExactlyOnce)
.finish()
Expand Down

0 comments on commit cec9ce7

Please sign in to comment.