The Easy MQTT asset enables developers to quickly and easily establish MQTT connections and initiate communication. At its core, the asset relies on a DLL that provides the fundamental functionality. This DLL serves as a wrapper for the widely used and well-known .NET library MQTTnet.
However, Easy MQTT is not just a simple wrapper. It extends the base library with additional useful features, offering solutions for developers of all skill levels—from beginners to advanced users—and for projects ranging from small-scale prototypes to complex applications.
Our team provides ongoing development, bug fixes, and maintenance for the package. If you have any questions, feature requests, or encounter issues, please feel free to contact us!
Email: contact@bytefactory.hu
To simplify the creation of MQTT clients, the asset provides a dedicated factory class. This approach encapsulates the client instantiation logic and ensures a consistent and reliable setup process.
When creating a client, a configuration object must be provided. This configuration defines all required connection parameters, including transport type, authentication, and reconnection behavior.
var client = new MqttClientFactory().CreateMqttClient(config);
The factory method CreateMqttClient returns an instance of the
IEasyMqttClient interface (from the EasyMqtt.Interfaces namespace).
// MQTT client interface
public interface IEasyMqttClient : IAsyncDisposable {
MqttClientConfig Config { get; }
Guid Id { get; }
MQTTConnectionState ConnectionState { get; }
event Func OnClientConnected;
event Func OnClientDisconnected;
event Action OnSubscribed;
event Action OnUnsubscribed;
event Action OnMessageReceived;
event Action OnError;
Task ConnectAsync(CancellationToken ct = default);
Task DisconnectAsync(DisconnectReason reason = DisconnectReason.NormalDisconnection, CancellationToken ct = default);
Task SubscribeAsync(string topic, QoS qos = QoS.AtMostOnce, CancellationToken ct = default);
Task UnsubscribeAsync(ITopicSubscription subscription, CancellationToken ct = default);
Task PublishAsync(string topic, string payload, QoS qos = QoS.AtMostOnce, bool retain = false, CancellationToken ct = default);
Task PublishAsync(string topic, T payload, QoS qos = QoS.AtMostOnce, bool retain = false, CancellationToken ct = default);
IMqttSubscriptionBuilder CreateSubscription();
}
The MqttClientConfig class defines all parameters required to establish a connection.
// client configuration
public class MqttClientConfig {
public string Host = "broker.example.com";
public int Port = 1883;
public TransportType Transport = TransportType.TCP;
public string WebSocketEndpoint = string.Empty;
public bool UseCredentials = false;
public string Username = "<username>";
public string Password = "<password>";
public MQTTConnectionMode ConnectionMode = MQTTConnectionMode.PlainText;
public string ExpectedHash = "<hash>";
public bool AutoReconnect = false;
public int ReconnectDelayMs = 5000;
}
/mqtt).
var config = new MqttClientConfig {
Host = "broker.example.com",
Port = 1883
};
var factory = new MqttClientFactory();
var client = factory.CreateMqttClient(config);
After creating the client, nearly all functionality of the DLL is accessed through this instance.
The Easy MQTT client provides multiple ways to subscribe to topics, ranging from simple one-line subscriptions to a powerful and expressive fluent builder approach.
For quick and straightforward scenarios, you can subscribe with a single method call:
var subscription = await client.SubscribeAsync("my/topic", QoS.AtMostOnce);
This approach is ideal when you only need basic subscription functionality without additional processing logic.
For more advanced use cases, Easy MQTT provides a fluent builder API that allows you to define subscriptions in a clean, readable, and composable way.
await client.CreateSubscription()
.WithTopic("game/events")
.WithQos(QoS.AtLeastOnce)
.AddTopicHandler(payload => {
Console.WriteLine($"Event received: {payload}");
return Task.CompletedTask;
})
.SubscribeAsync();
The fluent API is designed to be both easy to read and easy to extend, making it suitable for both simple and complex scenarios.
await client.CreateSubscription()
.WithTopic("logs/system")
.AddTopicHandler(payload => {
Console.WriteLine(payload);
return Task.CompletedTask;
})
.SubscribeAsync();
await client.CreateSubscription()
.WithTopic("game/state")
.AddTopicHandler(payload => {
Console.WriteLine("Handler 1");
return Task.CompletedTask;
})
.AddTopicHandler(payload => {
Console.WriteLine("Handler 2");
return Task.CompletedTask;
})
.SubscribeAsync();
await client.CreateSubscription()
.WithTopic("player/update")
.AddTopicHandler<PlayerDto>(data => {
Console.WriteLine(data.Username);
return Task.CompletedTask;
})
.SubscribeAsync();
In this case, the payload is automatically deserialized into a strongly typed object using the configured serializer.
Easy MQTT supports two types of message handlers:
This allows you to choose between maximum flexibility (string) and type safety (DTO), depending on your use case.
Internally, all subscriptions are managed by a router. Instead of binding a single handler to a topic, Easy MQTT introduces a route-based system, where multiple handlers can be attached and managed independently.
Each handler is associated with a unique route, represented by a RouteId.
public interface IRouteLink {
Guid RouteId { get; }
string Topic { get; }
}
var subscription = await client.CreateSubscription()
.WithTopic("chat/messages")
.AddTopicHandler(msg => {
Console.WriteLine("UI handler");
return Task.CompletedTask;
})
.AddTopicHandler(msg => {
Console.WriteLine("Logging handler");
return Task.CompletedTask;
})
.SubscribeAsync();
In this example:
This architecture enables clean separation of concerns and scalable message processing.
When using the fluent subscription builder, multiple handlers can be attached to the same topic. However, these handlers are managed collectively as part of a single subscription.
This means that individual handlers cannot be modified or removed separately, as the builder does not expose their underlying route identifiers.
await client.CreateSubscription()
.WithTopic("game/state")
.AddTopicHandler(handler1)
.AddTopicHandler(handler2)
.SubscribeAsync();
For scenarios where individual handlers need to be managed dynamically,
Easy MQTT exposes a lower-level routing API via IMqttRouterApi.
public interface IMqttRouterApi {
IRouteLink AddRoute(TopicReceiveHandler handler);
IRouteLink AddRoute<T>(TopicReceiveHandler<T> handler);
void DeleteRoute(IRouteLink link);
void DeleteAllRoutes();
}
Each call to AddRoute returns an IRouteLink, which uniquely identifies the registered handler.
var subscription = await client.SubscribeAsync("chat/messages");
var route1 = subscription.AddRoute(msg => {
Console.WriteLine("Handler 1");
return Task.CompletedTask;
});
var route2 = subscription.AddRoute(msg => {
Console.WriteLine("Handler 2");
return Task.CompletedTask;
});
// Later: remove a specific handler
subscription.DeleteRoute(route1);
RouteId.This dual approach allows you to choose between:
In addition to per-topic handlers, the client exposes a global event:
client.OnMessageReceived += (topic, qos, payload) => {
Console.WriteLine($"[{topic}] {payload}");
};
This event is triggered for every incoming message, regardless of subscriptions.
Subscriptions in Easy MQTT are tightly coupled to the lifecycle of the client connection.
When a subscription is created, it remains active as long as the client stays connected. However, subscriptions are not persistent across disconnections.
ConnectAsync().
// Internal behavior on disconnect
foreach(var subscription in _subscriptions.Values)
subscription.DeleteAllRoutes();
_subscriptions.Clear();
If AutoReconnect is enabled, the client will attempt to reconnect automatically.
However, subscriptions are not restored automatically after reconnection.
This means that after a reconnect, all subscriptions must be re-created manually.
It is recommended to re-subscribe inside the OnClientConnected event:
client.OnClientConnected += async (_) => {
await client.CreateSubscription()
.WithTopic("game/events")
.AddTopicHandler(msg => {
Console.WriteLine(msg);
return Task.CompletedTask;
})
.SubscribeAsync();
};
AutoReconnect.A common pattern is to store subscription definitions and re-subscribe automatically when the client reconnects.
public class MqttService {
private readonly IEasyMqttClient _client;
// Store subscription definitions (not actual subscriptions)
private readonly List<Func<Task>> _subscriptionDefinitions = new();
public MqttService(IEasyMqttClient client) {
_client = client;
// Register lifecycle events
_client.OnClientConnected += OnConnected;
_client.OnClientDisconnected += OnDisconnected;
// Define subscriptions (no actual subscribe happens here)
RegisterSubscriptions();
}
private void RegisterSubscriptions() {
// Only defining what should happen later on connect
_subscriptionDefinitions.Add(async () => {
await _client.CreateSubscription()
.WithTopic("game/events")
.AddTopicHandler(msg => {
Console.WriteLine($"Event: {msg}");
return Task.CompletedTask;
})
.SubscribeAsync();
});
_subscriptionDefinitions.Add(async () => {
await _client.CreateSubscription()
.WithTopic("player/update")
.AddTopicHandler<PlayerDto>(data => {
Console.WriteLine($"Player: {data.Username}");
return Task.CompletedTask;
})
.SubscribeAsync();
});
}
private async Task OnConnected(string clientId) {
Console.WriteLine($"Connected: {clientId}");
// Actual subscriptions happen here
foreach (var subscribe in _subscriptionDefinitions) {
await subscribe();
}
}
private async Task OnDisconnected(DisconnectReason reason) {
Console.WriteLine($"Disconnected: {reason}");
// Optional: handle cleanup, UI updates, etc.
await Task.CompletedTask;
}
}
Important: The RegisterSubscriptions method does not perform any actual subscriptions.
It only defines the subscription logic, which is executed later when the client successfully connects.
OnClientConnected is triggered.AutoReconnect. Both simple subscriptions and fluent builder subscriptions trigger the OnSubscribed event:
client.OnSubscribed += subscription => { Console.WriteLine($"Subscribed to topic: {subscription.Topic}"); };
The Easy MQTT DLL manages subscriptions internally through a concurrent dictionary.
Each ITopicSubscription represents a registered subscription, and unsubscribe operations
remove it both locally and from the MQTT broker.
// Subscribe directly
var subscription = await client.SubscribeAsync("game/events", QoS.AtMostOnce);
// Unsubscribe
await client.UnsubscribeAsync(subscription);
_subscriptions).DeleteAllRoutes()).OnUnsubscribed event.
The fluent builder creates a TopicSubscription only when SubscribeAsync() is called.
If multiple handlers are attached via the builder, they are grouped under the same subscription and are removed together.
// Fluent subscription
var sub = await client.CreateSubscription()
.WithTopic("game/state")
.AddTopicHandler(handler1)
.AddTopicHandler(handler2)
.SubscribeAsync();
// Remove all handlers at once
await client.UnsubscribeAsync(sub);
For more control, you can manipulate individual routes using AddRoute and DeleteRoute:
// Subscribe to a topic
var subscription = await client.SubscribeAsync("chat/messages");
// Add individual routes
var route1 = subscription.AddRoute(msg => {
Console.WriteLine("Handler 1");
return Task.CompletedTask;
});
var route2 = subscription.AddRoute<ChatDto>(data => {
Console.WriteLine($"Handler 2: {data.Username}");
return Task.CompletedTask;
});
// Remove a specific route without unsubscribing the whole subscription
subscription.DeleteRoute(route1);
// Remove all routes
subscription.DeleteAllRoutes();
RouteId.
Both fluent and manual unsubscriptions trigger the OnUnsubscribed event:
client.OnUnsubscribed += subscription => {
Console.WriteLine($"Unsubscribed from topic: {subscription.Topic}");
};
Easy MQTT allows publishing messages to topics in both string and strongly typed formats. Messages can also be sent with specific QoS levels and retained flags.
To send a plain string payload to a topic:
// Ensure the client is connected first
await client.PublishAsync("game/events", "Player joined", QoS.AtMostOnce);
For type-safe messages, publish a DTO that will be serialized to JSON:
public class PlayerDto {
public string Username { get; set; }
public int Score { get; set; }
}
// Publishing a typed object
var player = new PlayerDto { Username = "Alice", Score = 100 };
await client.PublishAsync("player/update", player, QoS.AtLeastOnce);
The payload is automatically serialized using the configured IMqttMsgSerializer.
Messages can be published with the retain flag to persist the last message on a topic:
// Retained message
await client.PublishAsync("game/status", "Server online", QoS.AtLeastOnce, retain: true);
When the client disconnects, either manually using DisconnectAsync or due to network issues,
all active subscriptions are automatically cleared according to the internal implementation.
On disconnect, the client performs the following steps:
DeleteAllRoutes()._subscriptions dictionary.
// Internal behavior on disconnect (simplified)
foreach (var subscription in _subscriptions.Values)
subscription.DeleteAllRoutes();
_subscriptions.Clear();
// Trigger OnClientDisconnected is invoked elsewhere in ClientDisconnected()
// Notify the application when the client disconnects
client.OnClientDisconnected += async reason => {
Console.WriteLine($"Client disconnected: {reason}");
// Optionally: update UI or perform cleanup
};
// Notify for each subscription removed
client.OnUnsubscribed += subscription => {
Console.WriteLine($"Subscription removed during disconnect: {subscription.Topic}");
};
OnClientConnected.
When the Easy MQTT client is no longer needed, it should be properly disposed to release resources, stop background loops,
and remove all subscriptions. The client implements IAsyncDisposable via DisposeAsync().
The DisposeAsync method performs the following steps in order:
AsyncLock to ensure thread-safe disposal.ConnectedAsync, DisconnectedAsync, ApplicationMessageReceivedAsync)._subscriptions dictionary._client instance._disposed flag to prevent further usage.
await client.DisposeAsync();
After disposal:
OnClientDisconnected or OnUnsubscribed may be triggered as part of cleanup.DisposeAsync when the client is no longer needed to avoid memory leaks or dangling connections.AsyncLock ensures that dispose operations are thread-safe and cannot run concurrently with connection or subscription logic.
This example shows how to fully manage an Easy MQTT client in Unity using MonoBehaviour.
It covers creating the client, connecting, subscribing, receiving messages, unsubscribing, and disposing.
Unity main thread safety is ensured using a dispatcher.
using UnityEngine;
using EasyMqtt.Client;
using EasyMqtt.Types;
using System;
using System.Threading.Tasks;
public class MqttUnityManager : MonoBehaviour
{
private IEasyMqttClient _client;
private ITopicSubscription _gameEventsSub;
async void Start()
{
// 1. Create MQTT client
_client = new EasyMqttClient(
new MqttClientConfig { Host = "broker.example.com", AutoReconnect = true },
new MqttRouter(),
new JsonSerializer()
);
// 2. Register connection events
_client.OnClientConnected += OnConnected;
_client.OnClientDisconnected += OnDisconnected;
_client.OnMessageReceived += OnMessageReceived;
_client.OnSubscribed += sub => Debug.Log($"Subscribed to: {sub.Topic}");
_client.OnUnsubscribed += sub => Debug.Log($"Unsubscribed from: {sub.Topic}");
// 3. Connect
await _client.ConnectAsync();
}
private async Task OnConnected(string clientId)
{
Debug.Log($"MQTT connected: {clientId}");
// 4. Subscribe to topics
_gameEventsSub = await _client.CreateSubscription()
.WithTopic("game/events")
.WithQos(QoS.AtLeastOnce)
.AddTopicHandler(msg =>
{
// Dispatch to Unity main thread
UnityMainThreadDispatcher.Instance().Enqueue(() =>
{
Debug.Log($"Game Event: {msg}");
// Example: update UI or GameObjects here
});
return Task.CompletedTask;
})
.SubscribeAsync();
}
private Task OnDisconnected(DisconnectReason reason)
{
Debug.Log($"MQTT disconnected: {reason}");
return Task.CompletedTask;
}
private void OnMessageReceived(string topic, string payload)
{
// Optional global handler
UnityMainThreadDispatcher.Instance().Enqueue(() =>
{
Debug.Log($"[Global] {topic}: {payload}");
});
}
// Example method to unsubscribe manually
public async Task UnsubscribeGameEvents()
{
if (_gameEventsSub != null)
{
await _client.UnsubscribeAsync(_gameEventsSub);
_gameEventsSub = null;
}
}
async void OnDestroy()
{
// 5. Clean up
if (_client != null)
{
await UnsubscribeGameEvents();
await _client.DisposeAsync();
_client = null;
}
}
}
UnityMainThreadDispatcher.Instance().Enqueue(Action) to marshal async callbacks safely.EasyMqttClient with config and router.ConnectAsync(), automatically triggers OnClientConnected.SubscribeAsync().OnMessageReceived.UnsubscribeAsync() to remove a subscription.DisposeAsync() on MonoBehaviour destroy to clean up all subscriptions and client resources.Unity API calls (like updating GameObjects or UI) must always run on the main thread. To safely execute MQTT callbacks or other async tasks, a main thread dispatcher is required.
using System;
using System.Collections.Concurrent;
using UnityEngine;
public class UnityMainThreadDispatcher : MonoBehaviour
{
private static UnityMainThreadDispatcher _instance;
private static readonly ConcurrentQueue _executionQueue = new ConcurrentQueue();
public static UnityMainThreadDispatcher Instance()
{
if (_instance == null)
{
var obj = new GameObject("MainThreadDispatcher");
_instance = obj.AddComponent();
DontDestroyOnLoad(obj);
}
return _instance;
}
public void Enqueue(Action action)
{
if (action == null) return;
_executionQueue.Enqueue(action);
}
void Update()
{
while (_executionQueue.TryDequeue(out var action))
{
action?.Invoke();
}
}
}
// Inside an async MQTT message handler
_gameEventsSub = await _client.CreateSubscription()
.WithTopic("game/events")
.AddTopicHandler(msg =>
{
// Dispatch to Unity main thread
UnityMainThreadDispatcher.Instance().Enqueue(() =>
{
Debug.Log($"Game Event: {msg}");
// Safely update GameObjects or UI here
});
return Task.CompletedTask;
})
.SubscribeAsync();
_executionQueue.Update() on the main thread.The Easy MQTT library exposes a clean and structured public API through interfaces. These interfaces define the capabilities of the client, subscriptions, serialization, and routing.
public interface IEasyMqttClient : IAsyncDisposable {
MqttClientConfig Config { get; }
Guid Id { get; }
MQTTConnectionState ConnectionState { get; }
event Func<string, Task> OnClientConnected;
event Func<DisconnectReason, Task> OnClientDisconnected;
event Action<ITopicSubscription> OnSubscribed;
event Action<ITopicSubscription> OnUnsubscribed;
event Action<string, QoS, string> OnMessageReceived;
event Action<Exception> OnError;
Task ConnectAsync(CancellationToken ct = default);
Task DisconnectAsync(DisconnectReason reason = DisconnectReason.NormalDisconnection, CancellationToken ct = default);
Task<ITopicSubscription> SubscribeAsync(string topic, QoS qos = QoS.AtMostOnce, CancellationToken ct = default);
Task UnsubscribeAsync(ITopicSubscription subscription, CancellationToken ct = default);
Task PublishAsync(string topic, string payload, QoS qos = QoS.AtMostOnce, bool retain = false, CancellationToken ct = default);
Task PublishAsync<T>(string topic, T payload, QoS qos = QoS.AtMostOnce, bool retain = false, CancellationToken ct = default);
IMqttSubscriptionBuilder CreateSubscription();
}
public interface IMqttClientFactory {
IEasyMqttClient CreateMqttClient(MqttClientConfig config);
}
public interface IMqttMsgSerializer {
string SerializeToJson<T>(T payload);
T DeserializeFromJson<T>(string payload);
}
public interface IMqttRouterApi {
IRouteLink AddRoute(TopicReceiveHandler handler);
IRouteLink AddRoute<T>(TopicReceiveHandler<T> handler);
void DeleteRoute(IRouteLink link);
void DeleteAllRoutes();
}
public interface IMqttSubscriptionBuilder {
MqttSubscriptionBuilder WithTopic(string topic);
MqttSubscriptionBuilder WithQos(QoS qos);
MqttSubscriptionBuilder AddTopicHandler(TopicReceiveHandler handler);
MqttSubscriptionBuilder AddTopicHandler<T>(TopicReceiveHandler<T> handler);
Task<ITopicSubscription> SubscribeAsync(CancellationToken ct = default);
}
public interface IRouteLink {
Guid RouteId { get; }
string Topic { get; }
}
public interface ITopicSubscription : IMqttRouterApi {
string Topic { get; }
QoS QoS { get; }
Task HandleMessageAsync(string payload, CancellationToken ct = default);
}
These interfaces form the backbone of the Easy MQTT client, enabling flexible client creation, subscription management, message publishing, and route-based message handling.
This demo scene shows how to integrate the Easy MQTT client into a Unity MonoBehaviour environment. It demonstrates connecting to a broker, subscribing to a topic, publishing messages, and handling events while ensuring all UI updates occur on the main Unity thread using a main thread dispatcher.
ConcurrentQueue<Action>.
Since MQTT events are asynchronous and run on background threads, all UI operations must be dispatched to the main thread.
This is done using a ConcurrentQueue<Action> and processing it in Update():
private readonly ConcurrentQueue<Action> _mainThreadActions = new();
private void Update() {
while (_mainThreadActions.TryDequeue(out Action action)) action();
}
private void Invoke(Action action) => _mainThreadActions.Enqueue(action);
The ConnectAsync method creates the client, registers events, and connects to the broker.
Upon successful connection, a topic subscription is created using the fluent builder API:
private async Task ConnectAsync(string broker, int port, bool useTLS, TransportType transportType, string wsEndpoint) {
var config = new MqttClientConfig {
Host = broker,
Port = port,
Transport = transportType,
WebSocketEndpoint = wsEndpoint,
ConnectionMode = useTLS ? MQTTConnectionMode.TLS_CA : MQTTConnectionMode.PlainText
};
_client?.DisposeAsync();
var mqttClientFactory = new MqttClientFactory();
_client = mqttClientFactory.CreateMqttClient(config);
_client.OnClientConnected += OnClientConnected;
_client.OnClientDisconnected += OnClientDisconnected;
_client.OnMessageReceived += OnMessageReceived;
_client.OnSubscribed += OnSubscribedToTopic;
_client.OnUnsubscribed += OnUnsubscribedFromTopic;
_client.OnError += OnError;
await _client.ConnectAsync();
}
Publishing uses the strongly typed MqttTopicData object, ensuring the payload is associated with the client ID:
public async Task<bool> PublishAsync(string payload, QoS qos = QoS.AtMostOnce, bool retain = false) {
var data = new MqttTopicData(_client.Id, payload);
await _client.PublishAsync<MqttTopicData>(_topic, data, qos, retain);
return true;
}
Disconnecting safely unsubscribes from topics, disconnects the client, and optionally unregisters event handlers:
private async Task DisconnectAsync(bool releaseEvents = false) {
if (_subscription != null) {
await _client.UnsubscribeAsync(_subscription);
_subscription = null;
}
await _client.DisconnectAsync();
if (releaseEvents) {
_client.OnClientConnected -= OnClientConnected;
_client.OnClientDisconnected -= OnClientDisconnected;
_client.OnMessageReceived -= OnMessageReceived;
_client.OnSubscribed -= OnSubscribedToTopic;
_client.OnUnsubscribed -= OnUnsubscribedFromTopic;
_client.OnError -= OnError;
}
}
All incoming messages and MQTT events are dispatched to the main thread to safely update the UI:
private void OnMessageReceived(string topic, QoS qos, string message) {
Invoke(() => _ui.LogInfoToConsole($"Message received on topic [{topic}] with QoS [{qos}]: {message}"));
}
private Task OnTopicMessageReceived(MqttTopicData data) {
Invoke(() => _ui.LogReceivedMessage(data.ClientId.ToString(), data.Payload));
return Task.CompletedTask;
}
The UI MonoBehaviour provides buttons and input fields for connecting, disconnecting, sending messages,
and selecting QoS and retain flags. Events from MqttManager are linked to button actions:
OnConnectButton → triggers ConnectAsyncOnDisconnectButton → triggers DisconnectAsyncOnPublishButton → triggers PublishAsyncSubscriptions are created via the fluent builder API and handled asynchronously. Incoming payloads are deserialized into typed objects:
_subscription = await _client.CreateSubscription()
.WithTopic(_topic)
.WithQos(_subscribeQos)
.AddTopicHandler<MqttTopicData>(OnTopicMessageReceived)
.SubscribeAsync();
ConcurrentQueue<Action>.OnDisable and OnDestroy by disconnecting, unsubscribing, and disposing the client.This example demonstrates a complete Unity integration scenario with Easy MQTT, covering connection lifecycle, message handling, publishing, and UI integration.