即时通讯-服务端搭建(一)


前言

之前在做技术积累的时候基于Layim+t-io做了一个即时通讯,由于某种原因,就搁置了,最近抽空将其完善,此系列记录开发过程。

关于t-io

在开始之前建议对于t-io不熟悉的可以去官网看看t-io 网络上对于t-io的解析的文章很多,可以看看,t-io的源码地址

开发环境

  操作系统:Windows 10 X64
  开发工具:IntelliJ IDEA ULTIMATE 2018.3
  JDK:1.8.0_131
  t-io:3.5.7.v20191115-RELEASE
  数据库:MySQL5.7

服务端搭建

在服务搭建之前建议先去网络上对以下几个类进行了解

  1. IWsMsgHandler(websocket事件监听器)

  2. ServerAioHandler(消息的处理)

  3. WsServerAioListener(事件监听)

  4. WsServerConfig(服务端的配置)

  5. ServerTioConfig(当前的上下文)

  6. TioServer(Tio服务)

    本篇文章不会对于t-io进行过多的分析,尽量减少文字的描述,大部分使用代码进行呈现,需要详细的了解t-io的话可以去它的官网进行了解

    详细代码

    引入t-io的包

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    <dependency>
    <groupId>org.t-io</groupId>
    <artifactId>tio-core</artifactId>
    <version>3.5.7.v20191115-RELEASE</version>
    </dependency>

    <dependency>
    <groupId>org.t-io</groupId>
    <artifactId>tio-websocket-server</artifactId>
    <version>3.5.7.v20191115-RELEASE</version>
    </dependency>

ServerSocketStarter.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 服务端启动
* @author lm
*/
@Configuration
public class ServerSocketStarter {
@Autowired
private WebSocketProperties webSocketProperties;
@Bean
public WsStarter serverStarter() throws Exception{
WesServerConfig wesServerConfig = new WesServerConfig(webSocketProperties.getPort());
wesServerConfig.setBindIp(webSocketProperties.getIp());
WsStarter wsStarter=new WsStarter(wesServerConfig,new WesMsgHandler());
//启动程序
wsStarter.start();

return wsStarter;
}
}

WesServerConfig.java

继承org.tio.websocket.server.WsServerConfig

1
2
3
4
5
6
7
8
9
10
11
public class WesServerConfig extends WsServerConfig {


public WesServerConfig(Integer bindPort, boolean useSession) {
super(bindPort, useSession);
}

public WesServerConfig(Integer bindPort) {
super(bindPort);
}
}

WsStarter.java

源码地址:org.tio.websocket.server.WsServerStarter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
* 改造原来的启动配置
* 原来的位置:
*/

public class WsStarter {
private static Logger log = LoggerFactory.getLogger(WsServerStarter.class);
//服务端 ip和端口的配置
private WesServerConfig wesServerConfig;
//定义handler,所有的请求数据全部都由这个handler来处理,decode/encode/handler等等
private WesMsgHandler wesMsgHandler;
//服务端的消息处理 所有的请求数据全部都由这个handler来处理,decode/encode/handler等等
private WesServerAioHandler wesServerAioHandler;
//可以在连接上、接收到消息、发送消息后等等回调其内部方法
private WesServerAioListener wesServerAioListener;
//服务端上下文初始化
private ServerTioConfig serverTioConfig;
private TioServer tioServer;

public TioServer getTioServer() {
return this.tioServer;
}

public WesServerConfig getWesServerConfig() {
return this.wesServerConfig;
}

public WesMsgHandler getWesMsgHandler() {
return wesMsgHandler;
}

public void setWesMsgHandler(WesMsgHandler wesMsgHandler) {
this.wesMsgHandler = wesMsgHandler;
}

public WesServerAioHandler getWesServerAioHandler() {
return this.wesServerAioHandler;
}

public WesServerAioListener getWesServerAioListener() {
return this.wesServerAioListener;
}

public ServerTioConfig getServerTioConfig() {
return this.serverTioConfig;
}



public WsStarter(WesServerConfig wesServerConfig, WesMsgHandler wesMsgHandler) throws IOException {
this(wesServerConfig, wesMsgHandler, (SynThreadPoolExecutor)null, (ThreadPoolExecutor)null);
}

public WsStarter(WesServerConfig wesServerConfig, WesMsgHandler wesMsgHandler, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) throws IOException {
this(wesServerConfig, wesMsgHandler, new WsTioUuid(), tioExecutor, groupExecutor);
}

public WsStarter(WesServerConfig wesServerConfig, WesMsgHandler wesMsgHandler, TioUuid tioUuid, SynThreadPoolExecutor tioExecutor, ThreadPoolExecutor groupExecutor) throws IOException {
this.wesServerConfig = null;
this.wesMsgHandler = null;
this.wesServerAioHandler = null;
this.wesServerAioListener = null;
this.serverTioConfig = null;
this.tioServer = null;
if (tioExecutor == null) {
tioExecutor = Threads.getTioExecutor();
}

if (groupExecutor == null) {
groupExecutor = Threads.getGroupExecutor();
}

this.wesServerConfig = wesServerConfig;
this.wesMsgHandler = wesMsgHandler;
wesServerAioHandler = new WesServerAioHandler(wesServerConfig, wesMsgHandler);
wesServerAioListener = new WesServerAioListener();
serverTioConfig = new ServerTioConfig("Tio Websocket Server", this.wesServerAioHandler, this.wesServerAioListener, tioExecutor, groupExecutor);
serverTioConfig.setHeartbeatTimeout(0L);
serverTioConfig.setTioUuid(tioUuid);
serverTioConfig.setReadBufferSize(30720);
tioServer = new TioServer(this.serverTioConfig);
WebMsgHandler.init();
}

/**
*
* @throws IOException
*/
public void start() throws IOException {
tioServer.start(this.wesServerConfig.getBindIp(), this.wesServerConfig.getBindPort());
}

}

WesMsgHandler.java

实现:org.tio.websocket.server.handler.IWsMsgHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

/**
* websocket事件的监听
*/
public class WesMsgHandler implements IWsMsgHandler {
private MineService mineService;

private FriendOrGroupService friendOrGroupService;

private static final Logger logger = LoggerFactory.getLogger(WesMsgHandler.class);
/**
* 握手时走这个方法,业务可以在这里获取cookie,request参数等
*/
@Override
public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
return handshakeUser(httpRequest,httpResponse,channelContext);
}

@Override
public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {

}

@Override
public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
return "暂不支持字节消息解析";
}

/**
* 离线or关闭
* @param wsRequest
* @param bytes
* @param channelContext
* @return
* @throws Exception
*/
@Override
public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {

Tio.remove(channelContext,"onClose");
return null;
}

@Override
public Object onText(WsRequest wsRequest, String s, ChannelContext channelContext) throws Exception {
return null;
}
private MineService getMineService(){
mineService= SpringContextUtil.getBean(MineService.class);

return mineService;

}

private FriendOrGroupService getFriendOrGroupService(){
friendOrGroupService= SpringContextUtil.getBean(FriendOrGroupService.class);

return friendOrGroupService;

}

/**
* 握手成功后的处理
* 将用户信息绑定到当前的上下文channelContext中
* */
private HttpResponse handshakeUser( HttpRequest httpRequest,HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
MineService mineService = getMineService();
FriendOrGroupService friendOrGroupService = getFriendOrGroupService();
//获得缓存
String userid=UserUtils.getImid();
//根据登录人的id获取当前登录人的信息
Mine mine= mineService.get(userid);
//将当前登录人的信息保存到layim需要的格式里面
String userId = mine.getId();
channelContext.setAttribute(userId, mine);
//绑定用户ID
Tio.bindUser(channelContext, userId);
//清除缓存
UserUtils.removeImid();

return httpResponse;
}


}

WesServerAioHandler.java

实现:org.tio.core.intf.AioHandler.ServerAioHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
/**
* 消息处理
*/
public class WesServerAioHandler implements ServerAioHandler {

private static Logger log = LoggerFactory.getLogger(WsServerAioHandler.class);

private WesServerConfig wesServerConfig;
private WesMsgHandler wesMsgHandler;

public WesServerAioHandler(WesServerConfig wesServerConfig, WesMsgHandler wesMsgHandler) {
this.wesServerConfig = wesServerConfig;
this.wesMsgHandler = wesMsgHandler;
}
/***
*服务端消息解码
* 将消息的返回值进行封装
* @param buffer
* @param limit
* @param position
* @param readableLength
* @param channelContext
* @return
* @throws AioDecodeException
*/
@Override
public WsRequest decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException {
WsSessionContext wsSessionContext = (WsSessionContext)channelContext.getAttribute();
if (!wsSessionContext.isHandshaked()) {
HttpRequest request = HttpRequestDecoder.decode(buffer, limit, position, readableLength, channelContext, wesServerConfig);
if (request == null) {
return null;
} else {
HttpResponse httpResponse = updateWebSocketProtocol(request, channelContext);
if (httpResponse == null) {
throw new AioDecodeException("http协议升级到websocket协议失败");
} else {
wsSessionContext.setHandshakeRequest(request);
wsSessionContext.setHandshakeResponse(httpResponse);
WsRequest wsRequestPacket = new WsRequest();
wsRequestPacket.setHandShake(true);
log.info("握手成功");
return wsRequestPacket;
}
}
} else {
WsRequest websocketPacket = WsServerDecoder.decode(buffer, channelContext);
return websocketPacket;
}
}


/**
* 编码:把业务消息包编码为可以发送的ByteBuffer
* 总的消息结构:消息头 + 消息体
* 消息头结构: 4个字节,存储消息体的长度
* 消息体结构: 对象的json串的byte[]
*/
@Override
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
WsResponse wsResponse = (WsResponse)packet;
if (wsResponse.isHandShake()) {
WsSessionContext imSessionContext = (WsSessionContext)channelContext.getAttribute();
HttpResponse handshakeResponse = imSessionContext.getHandshakeResponse();

try {
return HttpResponseEncoder.encode(handshakeResponse, tioConfig, channelContext);
} catch (UnsupportedEncodingException var8) {
log.error(var8.toString(), var8);
return null;
}
} else {
ByteBuffer byteBuffer = WsServerEncoder.encode(wsResponse, tioConfig, channelContext);
return byteBuffer;
}
}

public WsServerConfig getHttpConfig() {
return wesServerConfig;
}

/**
* 进行消息的处理以及转发
* @param websocketPacket
* @param bytes
* @param opcode
* @param channelContext
* @return
* @throws Exception
*/
private WsResponse handler(WsRequest websocketPacket, byte[] bytes, Opcode opcode, ChannelContext channelContext) throws Exception {
WsResponse wsResponse = null;
if (opcode == Opcode.TEXT) {
if (bytes != null && bytes.length != 0) {

//接收到的消息包
String text = new String(bytes, wesServerConfig.getCharset());
JSONObject jsonObject = JSON.parseObject(text);
//获得消息类型
MsgType property = Json.toBean(text, MsgType.class);
//解析数据消息
log.info("消息类型",property.getMsgtype());
MsgHandler msgHandler = WebMsgHandler.getMsg(property.getMsgtype());
boolean unknown = msgHandler == null;
if(!unknown) {
//根据消息类型进行消息的处理
msgHandler.handler(websocketPacket, channelContext);
}
Object retObj = wesMsgHandler.onText(websocketPacket, text, channelContext);
String methodName = "onText";
wsResponse = this.processRetObj(retObj, methodName, channelContext);
return wsResponse;
} else {
Tio.remove(channelContext, "错误的websocket包,body为空");
return null;
}
} else {
Object retObj;
String methodName;
if (opcode == Opcode.BINARY) {
if (bytes != null && bytes.length != 0) {
retObj = wesMsgHandler.onBytes(websocketPacket, bytes, channelContext);
methodName = "onBytes";
wsResponse = this.processRetObj(retObj, methodName, channelContext);
return wsResponse;
} else {
Tio.remove(channelContext, "错误的websocket包,body为空");
return null;
}
} else if (opcode != Opcode.PING && opcode != Opcode.PONG) {
if (opcode == Opcode.CLOSE) {
String text = ByteUtil.toText(bytes);
retObj = wesMsgHandler.onClose(websocketPacket, bytes, channelContext);
methodName = "onClose";
wsResponse = this.processRetObj(retObj, methodName, channelContext);
return wsResponse;
} else {
Tio.remove(channelContext, "错误的websocket包,错误的Opcode");
return null;
}
} else {
log.info("收到" + opcode);
return null;
}
}
}

@Override
public void handler(Packet packet, ChannelContext channelContext) throws Exception {
WsRequest wsRequest = (WsRequest)packet;
if (wsRequest.isHandShake()) {
WsSessionContext wsSessionContext = (WsSessionContext)channelContext.getAttribute();
HttpRequest request = wsSessionContext.getHandshakeRequest();
HttpResponse httpResponse = wsSessionContext.getHandshakeResponse();
HttpResponse r = wesMsgHandler.handshake(request, httpResponse, channelContext);
if (r == null) {
Tio.remove(channelContext, "业务层不同意握手");
} else {
wsSessionContext.setHandshakeResponse(r);
WsResponse wsResponse = new WsResponse();
wsResponse.setHandShake(true);
Tio.send(channelContext, wsResponse);
wsSessionContext.setHandshaked(true);
wesMsgHandler.onAfterHandshaked(request, httpResponse, channelContext);
}
} else {
log.info("进行消息的处理");
WsResponse wsResponse = this.handler(wsRequest, wsRequest.getBody(), wsRequest.getWsOpcode(), channelContext);
if (wsResponse != null) {
Tio.send(channelContext, wsResponse);
}

}
}

private WsResponse processRetObj(Object obj, String methodName, ChannelContext channelContext) throws Exception {
WsResponse wsResponse = null;
if (obj == null) {
return null;
} else if (obj instanceof String) {
String str = (String)obj;
wsResponse = WsResponse.fromText(str, wesServerConfig.getCharset());
return wsResponse;
} else if (obj instanceof byte[]) {
wsResponse = WsResponse.fromBytes((byte[])((byte[])obj));
return wsResponse;
} else if (obj instanceof WsResponse) {
return (WsResponse)obj;
} else if (obj instanceof ByteBuffer) {
byte[] bs = ((ByteBuffer)obj).array();
wsResponse = WsResponse.fromBytes(bs);
return wsResponse;
} else {
log.error("{} {}.{}()方法,只允许返回byte[]、ByteBuffer、WsResponse或null,但是程序返回了{}", new Object[]{channelContext, this.getClass().getName(), methodName, obj.getClass().getName()});
return null;
}
}

public void setHttpConfig(WesServerConfig httpConfig) {
wesServerConfig = httpConfig;
}

public static HttpResponse updateWebSocketProtocol(HttpRequest request, ChannelContext channelContext) {
Map<String, String> headers = request.getHeaders();
String Sec_WebSocket_Key = (String)headers.get("sec-websocket-key");
if (StrUtil.isNotBlank(Sec_WebSocket_Key)) {
String Sec_WebSocket_Key_Magic = Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
byte[] key_array = SHA1Util.SHA1(Sec_WebSocket_Key_Magic);
String acceptKey = BASE64Util.byteArrayToBase64(key_array);
HttpResponse httpResponse = new HttpResponse(request);
httpResponse.setStatus(HttpResponseStatus.C101);
Map<HeaderName, HeaderValue> respHeaders = new HashMap();
respHeaders.put(HeaderName.Connection, HeaderValue.Connection.Upgrade);
respHeaders.put(HeaderName.Upgrade, HeaderValue.Upgrade.WebSocket);
respHeaders.put(HeaderName.Sec_WebSocket_Accept, HeaderValue.from(acceptKey));
httpResponse.addHeaders(respHeaders);
return httpResponse;
} else {
return null;
}
}
}

WesServerAioListener.java

继承:org.tio.websocket.server.WsServerAioListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* 事件监听
*/
public class WesServerAioListener extends WsServerAioListener {
private static Logger log = LoggerFactory.getLogger(WesServerAioListener.class);


@Override
public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception {
super.onAfterConnected(channelContext, isConnected, isReconnect);
if (log.isInfoEnabled()) {
log.info("onAfterConnected\r\n{}", channelContext);
}

}

@Override
public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception {
super.onAfterSent(channelContext, packet, isSentSuccess);
if (log.isInfoEnabled()) {
log.info("onAfterSent\r\n{}\r\n{}", packet.logstr(), channelContext);
}
}

@Override
public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {
super.onBeforeClose(channelContext, throwable, remark, isRemove);
if (log.isInfoEnabled()) {
log.info("onBeforeClose\r\n{}", channelContext);
}

WsSessionContext wsSessionContext = (WsSessionContext) channelContext.get();

if (wsSessionContext != null && wsSessionContext.isHandshaked()) {

int count = Tio.getAll(channelContext.tioConfig).getObj().size();

String msg = channelContext.getClientNode().toString() + " 离开了,现在共有【" + count + "】人在线";
}
}

@Override
public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception {
super.onAfterDecoded(channelContext, packet, packetSize);
if (log.isInfoEnabled()) {
log.info("onAfterDecoded\r\n{}\r\n{}", packet.logstr(), channelContext);
}
}

@Override
public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception {
super.onAfterReceivedBytes(channelContext, receivedBytes);
if (log.isInfoEnabled()) {
log.info("onAfterReceivedBytes\r\n{}", channelContext);
}
}

@Override
public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception {
super.onAfterHandled(channelContext, packet, cost);
if (log.isInfoEnabled()) {
log.info("onAfterHandled\r\n{}\r\n{}", packet.logstr(), channelContext);
}
}
}

WebSocketProperties.java(端口和网址的配置)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

/**
* @author 端口和网址的配置
*/

@Component
@ConfigurationProperties(prefix = "websocket")
public class WebSocketProperties {
/**
* 端口
*/
private int port;
/**
* ip地址
*/
private String ip;



public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}


}

说明

WebSocketProperties类的配置一些配置是在application.yml文件中定义的

1
2
3
websocket:
ip: 192.168.xx.xxx #服务器的IP地址
port: 8089

结束

启动项目后出现如下图内容,证明服务端配置成功

由于部分内容待完善,源码暂时无法提供,等功能完善后会提供源码。
此篇内容仅仅只是记录自己在完成该功能的时候的一些过程,以及自己的思路。其中也遇到一些问题,通过网上查找了相关一些资料,如有不对的地方或者待完善的地方,请多多指教。

参考内容

-------------本文结束感谢您的阅读-------------