embox
217 строк · 6.8 Кб
1// async_subscribe.cpp
2//
3// This is a Paho MQTT C++ client, sample application.
4//
5// This application is an MQTT subscriber using the C++ asynchronous client
6// interface, employing callbacks to receive messages and status updates.
7//
8// The sample demonstrates:
9// - Connecting to an MQTT server/broker.
10// - Subscribing to a topic
11// - Receiving messages through the callback API
12// - Receiving network disconnect updates and attempting manual reconnects.
13// - Using a "clean session" and manually re-subscribing to topics on
14// reconnect.
15//
16
17/*******************************************************************************
18* Copyright (c) 2013-2020 Frank Pagliughi <fpagliughi@mindspring.com>
19*
20* All rights reserved. This program and the accompanying materials
21* are made available under the terms of the Eclipse Public License v1.0
22* and Eclipse Distribution License v1.0 which accompany this distribution.
23*
24* The Eclipse Public License is available at
25* http://www.eclipse.org/legal/epl-v10.html
26* and the Eclipse Distribution License is available at
27* http://www.eclipse.org/org/documents/edl-v10.php.
28*
29* Contributors:
30* Frank Pagliughi - initial implementation and documentation
31*******************************************************************************/
32
33#include <iostream>34#include <cstdlib>35#include <string>36#include <cstring>37#include <cctype>38#include <thread>39#include <chrono>40#include "mqtt/async_client.h"41
42#include <paho_mqtt_cpp_subscribe.inc>43
44static const std::string SERVER_ADDRESS(MQTT_SERVER_ADDRESS);45static const std::string CLIENT_ID("paho_cpp_async_subcribe");46static const std::string TOPIC(MQTT_SERVER_TOPIC);47
48static const int QOS = 1;49static const int N_RETRY_ATTEMPTS = 5;50
51/////////////////////////////////////////////////////////////////////////////
52
53// Callbacks for the success or failures of requested actions.
54// This could be used to initiate further action, but here we just log the
55// results to the console.
56
57class action_listener : public virtual mqtt::iaction_listener58{
59std::string name_;60
61void on_failure(const mqtt::token& tok) override {62std::cout << name_ << " failure";63if (tok.get_message_id() != 0)64std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;65std::cout << std::endl;66}67
68void on_success(const mqtt::token& tok) override {69std::cout << name_ << " success";70if (tok.get_message_id() != 0)71std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;72auto top = tok.get_topics();73if (top && !top->empty())74std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl;75std::cout << std::endl;76}77
78public:79action_listener(const std::string& name) : name_(name) {}80};81
82/////////////////////////////////////////////////////////////////////////////
83
84/**
85* Local callback & listener class for use with the client connection.
86* This is primarily intended to receive messages, but it will also monitor
87* the connection to the broker. If the connection is lost, it will attempt
88* to restore the connection and re-subscribe to the topic.
89*/
90class callback : public virtual mqtt::callback,91public virtual mqtt::iaction_listener92
93{
94// Counter for the number of connection retries95int nretry_;96// The MQTT client97mqtt::async_client& cli_;98// Options to use if we need to reconnect99mqtt::connect_options& connOpts_;100// An action listener to display the result of actions.101action_listener subListener_;102
103// This deomonstrates manually reconnecting to the broker by calling104// connect() again. This is a possibility for an application that keeps105// a copy of it's original connect_options, or if the app wants to106// reconnect with different options.107// Another way this can be done manually, if using the same options, is108// to just call the async_client::reconnect() method.109void reconnect() {110std::this_thread::sleep_for(std::chrono::milliseconds(2500));111try {112cli_.connect(connOpts_, nullptr, *this);113}114catch (const mqtt::exception& exc) {115std::cerr << "Error: " << exc.what() << std::endl;116exit(1);117}118}119
120// Re-connection failure121void on_failure(const mqtt::token& tok) override {122std::cout << "Connection attempt failed" << std::endl;123if (++nretry_ > N_RETRY_ATTEMPTS)124exit(1);125reconnect();126}127
128// (Re)connection success129// Either this or connected() can be used for callbacks.130void on_success(const mqtt::token& tok) override {}131
132// (Re)connection success133void connected(const std::string& cause) override {134std::cout << "\nConnection success" << std::endl;135std::cout << "\nSubscribing to topic '" << TOPIC << "'\n"136<< "\tfor client " << CLIENT_ID137<< " using QoS" << QOS << "\n"138<< "\nPress Q<Enter> to quit\n" << std::endl;139
140cli_.subscribe(TOPIC, QOS, nullptr, subListener_);141}142
143// Callback for when the connection is lost.144// This will initiate the attempt to manually reconnect.145void connection_lost(const std::string& cause) override {146std::cout << "\nConnection lost" << std::endl;147if (!cause.empty())148std::cout << "\tcause: " << cause << std::endl;149
150std::cout << "Reconnecting..." << std::endl;151nretry_ = 0;152reconnect();153}154
155// Callback for when a message arrives.156void message_arrived(mqtt::const_message_ptr msg) override {157std::cout << "Message arrived" << std::endl;158std::cout << "\ttopic: '" << msg->get_topic() << "'" << std::endl;159std::cout << "\tpayload: '" << msg->to_string() << "'\n" << std::endl;160}161
162void delivery_complete(mqtt::delivery_token_ptr token) override {}163
164public:165callback(mqtt::async_client& cli, mqtt::connect_options& connOpts)166: nretry_(0), cli_(cli), connOpts_(connOpts), subListener_("Subscription") {}167};168
169/////////////////////////////////////////////////////////////////////////////
170
171int main(int argc, char* argv[])172{
173// A subscriber often wants the server to remember its messages when its174// disconnected. In that case, it needs a unique ClientID and a175// non-clean session.176
177mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID);178
179mqtt::connect_options connOpts;180connOpts.set_clean_session(false);181
182// Install the callback(s) before connecting.183callback cb(cli, connOpts);184cli.set_callback(cb);185
186// Start the connection.187// When completed, the callback will subscribe to topic.188
189try {190std::cout << "Connecting to the MQTT server..." << std::flush;191cli.connect(connOpts, nullptr, cb);192}193catch (const mqtt::exception& exc) {194std::cerr << "\nERROR: Unable to connect to MQTT server: '"195<< SERVER_ADDRESS << "'" << exc << std::endl;196return 1;197}198
199// Just block till user tells us to quit.200
201while (std::tolower(std::cin.get()) != 'q')202;203
204// Disconnect205
206try {207std::cout << "\nDisconnecting from the MQTT server..." << std::flush;208cli.disconnect()->wait();209std::cout << "OK" << std::endl;210}211catch (const mqtt::exception& exc) {212std::cerr << exc << std::endl;213return 1;214}215
216return 0;217}
218
219