首页 > 编程 > Java > 正文

MQTT初体验mosquitto的java实现

2019-11-09 13:55:33
字体:
来源:转载
供稿:网友

上节简单描述了mosquitto的安装,与测试,本节继续MQTT使用java语言如何简单调用;

以上例子原型来源于网络,经过自己调试可用,目前仅限于代码的示例;

类:

ServerMQTT

ClientMQTT

PushCallback 

注: 例子中需要修改的地方:

        1、localhost:  修改为上节安装服务的ip地址;

         2、1883:修改为上节安装服务的端口,在配置文件中可修改,默认是(1883);

ServerMQTT类:

/** * Created by Administrator on 17-2-10. */import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.MqttPersistenceException;import org.eclipse.paho.client.mqttv3.MqttTopic;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;/** * * Title:Server * Description: 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题 * @author admin * 2017年2月10日下午17:41:10 */public class ServerMQTT {    //tcp://MQTT安装的服务器地址:MQTT定义的端口号    public static final String HOST = "tcp://localhost:1883";    //定义一个主题    public static final String TOPIC = "topic11";    //定义MQTT的ID,可以在MQTT服务配置中指定    PRivate static final String clientid = "server11";    private MqttClient client;    private MqttTopic topic11;    private String userName = "mosquitto";    private String passWord = "";    private MqttMessage message;    /**     * 构造函数     * @throws MqttException     */    public ServerMQTT() throws MqttException {        // MemoryPersistence设置clientid的保存形式,默认为以内存保存        client = new MqttClient(HOST, clientid, new MemoryPersistence());        connect();    }    /**     *  用来连接服务器     */    private void connect() {        MqttConnectOptions options = new MqttConnectOptions();        options.setCleansession(false);        options.setUserName(userName);        options.setPassword(passWord.toCharArray());        // 设置超时时间        options.setConnectionTimeout(10);        // 设置会话心跳时间        options.setKeepAliveInterval(20);        try {            client.setCallback(new PushCallback());            client.connect(options);            topic11 = client.getTopic(TOPIC);        } catch (Exception e) {            e.printStackTrace();        }    }    /**     *     * @param topic     * @param message     * @throws MqttPersistenceException     * @throws MqttException     */    public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,            MqttException {        MqttDeliveryToken token = topic.publish(message);        token.waitForCompletion();        System.out.println("message is published completely! "                + token.isComplete());    }    /**     *  启动入口     * @param args     * @throws MqttException     */    public static void main(String[] args) throws MqttException {        ServerMQTT server = new ServerMQTT();        server.message = new MqttMessage();        server.message.setQos(1);        server.message.setRetained(true);        server.message.setPayload("hello,topic11".getBytes());        server.publish(server.topic11 , server.message);        System.out.println(server.message.isRetained() + "------ratained状态");    }}

ClientMQTT类:

/** * * Description: * @author admin * 2017年2月10日下午17:50:15 */import java.util.concurrent.ScheduledExecutorService;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttTopic;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class ClientMQTT {    public static final String HOST = "tcp://localhost:1883";    public static final String TOPIC = "topic11";    private static final String clientid = "client11";    private MqttClient client;    private MqttConnectOptions options;    private String userName = "admin";    private String passWord = "password";    private ScheduledExecutorService scheduler;    private void start() {        try {            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存            client = new MqttClient(HOST, clientid, new MemoryPersistence());            // MQTT的连接设置            options = new MqttConnectOptions();            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接            options.setCleanSession(true);            // 设置连接的用户名            options.setUserName(userName);            // 设置连接的密码            options.setPassword(passWord.toCharArray());            // 设置超时时间 单位为秒            options.setConnectionTimeout(10);            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制            options.setKeepAliveInterval(20);            // 设置回调            client.setCallback(new PushCallback());            MqttTopic topic = client.getTopic(TOPIC);            //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息            options.setWill(topic, "close".getBytes(), 2, true);            client.connect(options);            //订阅消息            int[] Qos  = {1};            String[] topic1 = {TOPIC};            client.subscribe(topic1, Qos);        } catch (Exception e) {            e.printStackTrace();        }    }    public static void main(String[] args) throws MqttException {        ClientMQTT client = new ClientMQTT();        client.start();    }}PushCallback类:

/** * * Description: * @author admin * 2017年2月10日下午18:04:07 */import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttMessage;/** * 发布消息的回调类 * * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 * 在回调中,将它用来标识已经启动了该回调的哪个实例。 * 必须在回调类中实现三个方法: * *  public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。 * *  public void connectionLost(Throwable cause)在断开连接时调用。 * *  public void deliveryComplete(MqttDeliveryToken token)) *  接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 *  由 MqttClient.connect 激活此回调。 * */public class PushCallback implements MqttCallback {    public void connectionLost(Throwable cause) {        // 连接丢失后,一般在这里面进行重连        System.out.println("连接断开,可以做重连");    }    public void deliveryComplete(IMqttDeliveryToken token) {        System.out.println("deliveryComplete---------" + token.isComplete());    }    public void messageArrived(String topic, MqttMessage message) throws Exception {        // subscribe后得到的消息会执行到这里面        System.out.println("接收消息主题 : " + topic);        System.out.println("接收消息Qos : " + message.getQos());        System.out.println("接收消息内容 : " + new String(message.getPayload()));    }}server类启动的结果:

deliveryComplete---------truemessage is published completely! truetrue------ratained状态client类启动的结果:

接收消息主题 : topic11接收消息Qos : 1接收消息内容 : hello,topic11


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表