首页 > 编程 > Java > 正文

SpringBoot webSocket实现发送广播、点对点消息和Android接收

2019-11-26 12:48:27
字体:
来源:转载
供稿:网友

1、SpringBoot webSocket

SpringBoot 使用的websocket 协议,不是标准的websocket协议,使用的是名称叫做STOMP的协议。

1.1 STOMP协议说明

STOMP,Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。

它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互,类似于OpenWire(一种二进制协议)。

由于其设计简单,很容易开发客户端,因此在多种语言和多种平台上得到广泛应用。其中最流行的STOMP消息代理是Apache ActiveMQ。

1.2 搭建

本人使用的是Inject idea 搭建的springBoot websocket,并未采用熟悉的gradle,而是采用了maven方式搭建。

项目结构如下

 

pom.xml:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.drawthink</groupId> <artifactId>websocketdemo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>webSocketdemo</name> <description>webSocketDemo project for Spring Boot</description> <parent>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-parent</artifactId>  <version>1.3.6.RELEASE</version>  <relativePath/> <!-- lookup parent from repository --> </parent> <properties>  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>  <java.version>1.8</java.version> </properties> <dependencies>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-thymeleaf</artifactId>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-websocket</artifactId>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-test</artifactId>   <scope>test</scope>  </dependency> </dependencies> <build>  <plugins>   <plugin>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-maven-plugin</artifactId>   </plugin>  </plugins> </build></project>

Application:

package com.drawthink;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class WebSocketdemoApplication { public static void main(String[] args) {  SpringApplication.run(WebSocketdemoApplication.class, args); }}

WebSocketConfig

package com.drawthink.websocket;import org.springframework.context.annotation.Configuration;import org.springframework.messaging.simp.config.MessageBrokerRegistry;import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;import org.springframework.web.socket.config.annotation.StompEndpointRegistry;/** * Created by lincoln on 16-10-25 */@Configuration@EnableWebSocketMessageBrokerpublic class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {  //允许使用socketJs方式访问,访问点为hello,允许跨域  stompEndpointRegistry.addEndpoint("/hello").setAllowedOrigins("*").withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) {  //订阅Broker名称  registry.enableSimpleBroker("/topic","/user");  //全局使用的订阅前缀(客户端订阅路径上会体现出来)  registry.setApplicationDestinationPrefixes("/app/");  //点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/  //registry.setUserDestinationPrefix("/user/"); }}

WebSocketController

package com.drawthink.websocket.controller;import com.drawthink.message.ClientMessage;import com.drawthink.message.ServerMessage;import com.drawthink.message.ToUserMessage;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.handler.annotation.MessageMapping;import org.springframework.messaging.handler.annotation.SendTo;import org.springframework.messaging.simp.SimpMessagingTemplate;import org.springframework.stereotype.Controller;/** * Created by lincoln on 16-10-25 */@Controllerpublic class WebSocketController { @MessageMapping("/welcome") //SendTo 发送至 Broker 下的指定订阅路径 @SendTo("/topic/getResponse") public ServerMessage say(ClientMessage clientMessage){  //方法用于广播测试  System.out.println("clientMessage.getName() = " + clientMessage.getName());  return new ServerMessage("Welcome , "+clientMessage.getName()+" !"); } //注入SimpMessagingTemplate 用于点对点消息发送 @Autowired private SimpMessagingTemplate messagingTemplate; @MessageMapping("/cheat") // 发送的订阅路径为/user/{userId}/message // /user/路径是默认的一个,如果想要改变,必须在config 中setUserDestinationPrefix public void cheatTo(ToUserMessage toUserMessage){  //方法用于点对点测试  System.out.println("toUserMessage.getMessage() = " + toUserMessage.getMessage());  System.out.println("toUserMessage.getUserId() = " + toUserMessage.getUserId());          messagingTemplate.convertAndSendToUser(toUserMessage.getUserId(),"/message",toUserMessage.getMessage()); }}

Vo

package com.drawthink.message;/** * Created by lincoln on 16-10-25 */public class ClientMessage { private String name; public String getName() {  return name; } public void setName(String name) {  this.name = name; }}
package com.drawthink.message;/** * Created by lincoln on 16-10-25 */public class ServerMessage { private String responseMessage; public ServerMessage(String responseMessage) {  this.responseMessage = responseMessage; } public String getResponseMessage() {  return responseMessage; } public void setResponseMessage(String responseMessage) {  this.responseMessage = responseMessage; }}
package com.drawthink.message;/** * Created by lincoln on 16-10-25 */public class ToUserMessage { private String userId; private String message; public String getUserId() {  return userId; } public void setUserId(String userId) {  this.userId = userId; } public String getMessage() {  return message; } public void setMessage(String message) {  this.message = message; }}

Android 客户端

STOMP协议在Android系统中没有默认实现,必须自行去实现。不过好消息是,开源大神们已经完成了Android上使用STOMP协议的实现,所以我们只需要使用就好了。

地址:StompProtocolAndroid_jb51.rar

搭建

build.gradle(app)

apply plugin: 'com.android.application'android { compileSdkVersion 24 buildToolsVersion "24.0.3" defaultConfig {  applicationId "com.drawthink.websocket"  minSdkVersion 16  targetSdkVersion 24  versionCode 1  versionName "1.0"  testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner" } buildTypes {  release {   minifyEnabled false   proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'  } }}dependencies { compile fileTree(include: ['*.jar'], dir: 'libs') androidTestCompile('com.android.support.test.espresso:espresso-core:2.2.2', {  exclude group: 'com.android.support', module: 'support-annotations' }) compile 'com.android.support:appcompat-v7:24.2.1' testCompile 'junit:junit:4.12' //依赖STOMP协议的Android实现 compile 'com.github.NaikSoftware:StompProtocolAndroid:1.1.1' //StompProtocolAndroid 依赖于webSocket的标准实现 compile 'org.java-websocket:Java-WebSocket:1.3.0'}

接收广播实例:

package com.drawthink.websocket;import android.content.Intent;import android.os.Bundle;import android.support.v7.app.AppCompatActivity;import android.util.Log;import android.view.View;import android.widget.Button;import android.widget.EditText;import android.widget.TextView;import android.widget.Toast;import org.java_websocket.WebSocket;import rx.Subscriber;import rx.functions.Action1;import ua.naiksoftware.stomp.LifecycleEvent;import ua.naiksoftware.stomp.Stomp;import ua.naiksoftware.stomp.client.StompClient;import ua.naiksoftware.stomp.client.StompMessage;import static android.content.ContentValues.TAG;public class MainActivity extends AppCompatActivity { private TextView serverMessage; private Button start; private Button stop; private Button send; private EditText editText; private StompClient mStompClient; private Button cheat; @Override protected void onCreate(Bundle savedInstanceState) {  super.onCreate(savedInstanceState);  setContentView(R.layout.activity_main);  bindView();  start.setOnClickListener(new View.OnClickListener() {   @Override   public void onClick(View v) {   //创建client 实例    createStompClient();   //订阅消息    registerStompTopic();   }  });  send.setOnClickListener(new View.OnClickListener() {   @Override   public void onClick(View v) {    mStompClient.send("/app/welcome","{/"name/":/""+editText.getText()+"/"}")      .subscribe(new Subscriber<Void>() {     @Override     public void onCompleted() {      toast("发送成功");     }     @Override     public void onError(Throwable e) {      e.printStackTrace();      toast("发送错误");     }     @Override     public void onNext(Void aVoid) {     }    });   }  });  stop.setOnClickListener(new View.OnClickListener() {   @Override   public void onClick(View v) {    mStompClient.disconnect();   }  });  cheat.setOnClickListener(new View.OnClickListener() {   @Override   public void onClick(View v) {    startActivity(new Intent(MainActivity.this,CheatActivity.class));    if(mStompClient != null) {     mStompClient.disconnect();    }    finish();   }  }); } private void showMessage(final StompMessage stompMessage) {  runOnUiThread(new Runnable() {   @Override   public void run() {    serverMessage.setText("stomp command is --->"+stompMessage.getStompCommand() +" body is --->"+stompMessage.getPayload());   }  }); } //创建client 实例 private void createStompClient() {  mStompClient = Stomp.over(WebSocket.class, "ws://192.168.0.46:8080/hello/websocket");  mStompClient.connect();  Toast.makeText(MainActivity.this,"开始连接 192.168.0.46:8080",Toast.LENGTH_SHORT).show();  mStompClient.lifecycle().subscribe(new Action1<LifecycleEvent>() {   @Override   public void call(LifecycleEvent lifecycleEvent) {    switch (lifecycleEvent.getType()) {     case OPENED:      Log.d(TAG, "Stomp connection opened");      toast("连接已开启");      break;     case ERROR:      Log.e(TAG, "Stomp Error", lifecycleEvent.getException());      toast("连接出错");      break;     case CLOSED:      Log.d(TAG, "Stomp connection closed");      toast("连接关闭");      break;    }   }  }); } //订阅消息 private void registerStompTopic() {  mStompClient.topic("/topic/getResponse").subscribe(new Action1<StompMessage>() {   @Override   public void call(StompMessage stompMessage) {    Log.e(TAG, "call: " +stompMessage.getPayload() );    showMessage(stompMessage);   }  }); } private void toast(final String message) {  runOnUiThread(new Runnable() {   @Override   public void run() {    Toast.makeText(MainActivity.this,message,Toast.LENGTH_SHORT).show();   }  }); } private void bindView() {  serverMessage = (TextView) findViewById(R.id.serverMessage);  start = (Button) findViewById(R.id.start);  stop = (Button) findViewById(R.id.stop);  send = (Button) findViewById(R.id.send);  editText = (EditText) findViewById(R.id.clientMessage);  cheat = (Button) findViewById(R.id.cheat); }}

点对点

package com.drawthink.websocket;import android.os.Bundle;import android.support.v7.app.AppCompatActivity;import android.util.Log;import android.view.View;import android.widget.Button;import android.widget.EditText;import android.widget.LinearLayout;import android.widget.TextView;import android.widget.Toast;import org.java_websocket.WebSocket;import rx.Subscriber;import rx.functions.Action1;import ua.naiksoftware.stomp.LifecycleEvent;import ua.naiksoftware.stomp.Stomp;import ua.naiksoftware.stomp.client.StompClient;import ua.naiksoftware.stomp.client.StompMessage;import static android.content.ContentValues.TAG;public class CheatActivity extends AppCompatActivity { private EditText cheat; private Button send; private LinearLayout message; private StompClient mStompClient; @Override protected void onCreate(Bundle savedInstanceState) {  super.onCreate(savedInstanceState);  setContentView(R.layout.activity_cheat);  bindView();  createStompClient();  registerStompTopic();  send.setOnClickListener(new View.OnClickListener() {   @Override   public void onClick(View v) {   // 向/app/cheat发送Json数据    mStompClient.send("/app/cheat","{/"userId/":/"lincoln/",/"message/":/""+cheat.getText()+"/"}")      .subscribe(new Subscriber<Void>() {       @Override       public void onCompleted() {        toast("发送成功");       }       @Override       public void onError(Throwable e) {        e.printStackTrace();        toast("发送错误");       }       @Override       public void onNext(Void aVoid) {       }      });   }  }); } private void bindView() {  cheat = (EditText) findViewById(R.id.cheat);  send = (Button) findViewById(R.id.send);  message = (LinearLayout) findViewById(R.id.message); } private void createStompClient() {  mStompClient = Stomp.over(WebSocket.class, "ws://192.168.0.46:8080/hello/websocket");  mStompClient.connect();  Toast.makeText(CheatActivity.this,"开始连接 192.168.0.46:8080",Toast.LENGTH_SHORT).show();  mStompClient.lifecycle().subscribe(new Action1<LifecycleEvent>() {   @Override   public void call(LifecycleEvent lifecycleEvent) {    switch (lifecycleEvent.getType()) {     case OPENED:      Log.d(TAG, "Stomp connection opened");      toast("连接已开启");      break;     case ERROR:      Log.e(TAG, "Stomp Error", lifecycleEvent.getException());      toast("连接出错");      break;     case CLOSED:      Log.d(TAG, "Stomp connection closed");      toast("连接关闭");      break;    }   }  }); } // 接收/user/xiaoli/message路径发布的消息 private void registerStompTopic() {  mStompClient.topic("/user/xiaoli/message").subscribe(new Action1<StompMessage>() {   @Override   public void call(StompMessage stompMessage) {    Log.e(TAG, "call: " +stompMessage.getPayload() );    showMessage(stompMessage);   }  }); } private void showMessage(final StompMessage stompMessage) {  runOnUiThread(new Runnable() {   @Override   public void run() {    TextView text = new TextView(CheatActivity.this);    text.setLayoutParams(new LinearLayout.LayoutParams(LinearLayout.LayoutParams.MATCH_PARENT, LinearLayout.LayoutParams.WRAP_CONTENT));    text.setText(System.currentTimeMillis() +" body is --->"+stompMessage.getPayload());    message.addView(text);   }  }); } private void toast(final String message) {  runOnUiThread(new Runnable() {   @Override   public void run() {    Toast.makeText(CheatActivity.this,message,Toast.LENGTH_SHORT).show();   }  }); }}

代码比较乱,说明一下。

1、STOMP 使用的时候,关键是发布订阅的关系,使用过消息队列,例如rabbitMQ的应该很容易理解。

服务器端 WebSocketConfig.Java文件控制的就是订阅发布的路径关系。

2、websocket的路径说明,本例中连接的是ws://192.168.0.46:8080/hello/websocket路径,/hello是在WebSocketConfig的stompEndpointRegistry.addEndpoint(“/hello”).setAllowedOrigins(““).withSockJS();*确定的, 如果有多个endpoint,这个地方的路径也会随之变化。

3、发布路径

发布信息的路径是由WebSocketConfig中的 setApplicationDestinationPrefixes(“/app/”); 和 Controller 中@MessageMapping(“/welcome”) 组合确定的。

例如发广播消息,路径为/app/welcome

例如发点对点消息,路径为/app/cheat

4、消息订阅路径

订阅broker源自WebSocketConfig中的registry.enableSimpleBroker(“/topic”,”/user”);此处开放了两个broker,具体的订阅服务路径给基于Controller中的 @SendTo(“/topic/getResponse”)或SimpMessagingTemplate中给定。(注:此处,服务器和客户端须约定订阅路径)

5、关于心跳

订阅发布模型的心跳很简单,客户端向一个指定的心跳路径发送心跳,服务器处理,服务器使用指定的订阅路径向客户端发心跳,即可。因为没有Socket,只需要记录是否联通的状态即可,重连客户端做一下就好了。

本人菜鸟,肯定有些地方没有搞清楚,如果有误,请大神斧正。

代码下载地址:blogRepository_jb51.rar

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表