embox

Форк
0
217 строк · 6.8 Кб
1
// async_subscribe.cpp
2
//
3
// This is a Paho MQTT C++ client, sample application.
4
//
5
// This application is an MQTT subscriber using the C++ asynchronous client
6
// interface, employing callbacks to receive messages and status updates.
7
//
8
// The sample demonstrates:
9
//  - Connecting to an MQTT server/broker.
10
//  - Subscribing to a topic
11
//  - Receiving messages through the callback API
12
//  - Receiving network disconnect updates and attempting manual reconnects.
13
//  - Using a "clean session" and manually re-subscribing to topics on
14
//    reconnect.
15
//
16

17
/*******************************************************************************
18
 * Copyright (c) 2013-2020 Frank Pagliughi <fpagliughi@mindspring.com>
19
 *
20
 * All rights reserved. This program and the accompanying materials
21
 * are made available under the terms of the Eclipse Public License v1.0
22
 * and Eclipse Distribution License v1.0 which accompany this distribution.
23
 *
24
 * The Eclipse Public License is available at
25
 *    http://www.eclipse.org/legal/epl-v10.html
26
 * and the Eclipse Distribution License is available at
27
 *   http://www.eclipse.org/org/documents/edl-v10.php.
28
 *
29
 * Contributors:
30
 *    Frank Pagliughi - initial implementation and documentation
31
 *******************************************************************************/
32

33
#include <iostream>
34
#include <cstdlib>
35
#include <string>
36
#include <cstring>
37
#include <cctype>
38
#include <thread>
39
#include <chrono>
40
#include "mqtt/async_client.h"
41

42
#include <paho_mqtt_cpp_subscribe.inc>
43

44
static const std::string SERVER_ADDRESS(MQTT_SERVER_ADDRESS);
45
static const std::string CLIENT_ID("paho_cpp_async_subcribe");
46
static const std::string TOPIC(MQTT_SERVER_TOPIC);
47

48
static const int	QOS = 1;
49
static const int	N_RETRY_ATTEMPTS = 5;
50

51
/////////////////////////////////////////////////////////////////////////////
52

53
// Callbacks for the success or failures of requested actions.
54
// This could be used to initiate further action, but here we just log the
55
// results to the console.
56

57
class action_listener : public virtual mqtt::iaction_listener
58
{
59
	std::string name_;
60

61
	void on_failure(const mqtt::token& tok) override {
62
		std::cout << name_ << " failure";
63
		if (tok.get_message_id() != 0)
64
			std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
65
		std::cout << std::endl;
66
	}
67

68
	void on_success(const mqtt::token& tok) override {
69
		std::cout << name_ << " success";
70
		if (tok.get_message_id() != 0)
71
			std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
72
		auto top = tok.get_topics();
73
		if (top && !top->empty())
74
			std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl;
75
		std::cout << std::endl;
76
	}
77

78
public:
79
	action_listener(const std::string& name) : name_(name) {}
80
};
81

82
/////////////////////////////////////////////////////////////////////////////
83

84
/**
85
 * Local callback & listener class for use with the client connection.
86
 * This is primarily intended to receive messages, but it will also monitor
87
 * the connection to the broker. If the connection is lost, it will attempt
88
 * to restore the connection and re-subscribe to the topic.
89
 */
90
class callback : public virtual mqtt::callback,
91
					public virtual mqtt::iaction_listener
92

93
{
94
	// Counter for the number of connection retries
95
	int nretry_;
96
	// The MQTT client
97
	mqtt::async_client& cli_;
98
	// Options to use if we need to reconnect
99
	mqtt::connect_options& connOpts_;
100
	// An action listener to display the result of actions.
101
	action_listener subListener_;
102

103
	// This deomonstrates manually reconnecting to the broker by calling
104
	// connect() again. This is a possibility for an application that keeps
105
	// a copy of it's original connect_options, or if the app wants to
106
	// reconnect with different options.
107
	// Another way this can be done manually, if using the same options, is
108
	// to just call the async_client::reconnect() method.
109
	void reconnect() {
110
		std::this_thread::sleep_for(std::chrono::milliseconds(2500));
111
		try {
112
			cli_.connect(connOpts_, nullptr, *this);
113
		}
114
		catch (const mqtt::exception& exc) {
115
			std::cerr << "Error: " << exc.what() << std::endl;
116
			exit(1);
117
		}
118
	}
119

120
	// Re-connection failure
121
	void on_failure(const mqtt::token& tok) override {
122
		std::cout << "Connection attempt failed" << std::endl;
123
		if (++nretry_ > N_RETRY_ATTEMPTS)
124
			exit(1);
125
		reconnect();
126
	}
127

128
	// (Re)connection success
129
	// Either this or connected() can be used for callbacks.
130
	void on_success(const mqtt::token& tok) override {}
131

132
	// (Re)connection success
133
	void connected(const std::string& cause) override {
134
		std::cout << "\nConnection success" << std::endl;
135
		std::cout << "\nSubscribing to topic '" << TOPIC << "'\n"
136
			<< "\tfor client " << CLIENT_ID
137
			<< " using QoS" << QOS << "\n"
138
			<< "\nPress Q<Enter> to quit\n" << std::endl;
139

140
		cli_.subscribe(TOPIC, QOS, nullptr, subListener_);
141
	}
142

143
	// Callback for when the connection is lost.
144
	// This will initiate the attempt to manually reconnect.
145
	void connection_lost(const std::string& cause) override {
146
		std::cout << "\nConnection lost" << std::endl;
147
		if (!cause.empty())
148
			std::cout << "\tcause: " << cause << std::endl;
149

150
		std::cout << "Reconnecting..." << std::endl;
151
		nretry_ = 0;
152
		reconnect();
153
	}
154

155
	// Callback for when a message arrives.
156
	void message_arrived(mqtt::const_message_ptr msg) override {
157
		std::cout << "Message arrived" << std::endl;
158
		std::cout << "\ttopic: '" << msg->get_topic() << "'" << std::endl;
159
		std::cout << "\tpayload: '" << msg->to_string() << "'\n" << std::endl;
160
	}
161

162
	void delivery_complete(mqtt::delivery_token_ptr token) override {}
163

164
public:
165
	callback(mqtt::async_client& cli, mqtt::connect_options& connOpts)
166
				: nretry_(0), cli_(cli), connOpts_(connOpts), subListener_("Subscription") {}
167
};
168

169
/////////////////////////////////////////////////////////////////////////////
170

171
int main(int argc, char* argv[])
172
{
173
	// A subscriber often wants the server to remember its messages when its
174
	// disconnected. In that case, it needs a unique ClientID and a
175
	// non-clean session.
176

177
	mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID);
178

179
	mqtt::connect_options connOpts;
180
	connOpts.set_clean_session(false);
181

182
	// Install the callback(s) before connecting.
183
	callback cb(cli, connOpts);
184
	cli.set_callback(cb);
185

186
	// Start the connection.
187
	// When completed, the callback will subscribe to topic.
188

189
	try {
190
		std::cout << "Connecting to the MQTT server..." << std::flush;
191
		cli.connect(connOpts, nullptr, cb);
192
	}
193
	catch (const mqtt::exception& exc) {
194
		std::cerr << "\nERROR: Unable to connect to MQTT server: '"
195
			<< SERVER_ADDRESS << "'" << exc << std::endl;
196
		return 1;
197
	}
198

199
	// Just block till user tells us to quit.
200

201
	while (std::tolower(std::cin.get()) != 'q')
202
		;
203

204
	// Disconnect
205

206
	try {
207
		std::cout << "\nDisconnecting from the MQTT server..." << std::flush;
208
		cli.disconnect()->wait();
209
		std::cout << "OK" << std::endl;
210
	}
211
	catch (const mqtt::exception& exc) {
212
		std::cerr << exc << std::endl;
213
		return 1;
214
	}
215

216
 	return 0;
217
}
218

219

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.