短連接
概念
client與server通過三次握手建立連接,client發送請求消息,server返回響應,一次連接就完成了。
這時候雙方任意都可以發起close操作,不過一般都是client先發起close操作。由于短連接一般只會在 client/server 間傳遞一次請求操作,因此短連接的特點是連接生命周期短暫,連接建立和斷開的開銷較大,適用于單次請求響應的場景。
短連接的優缺點
管理起來比較簡單,存在的連接都是有用的連接,不需要額外的控制手段。
使用場景
通常情況下,當瀏覽器訪問服務器時,采用的是短連接的方式。
對于服務端而言,長連接會消耗大量的資源,而且用戶使用瀏覽器對服務端的訪問頻率相對較低。如果同時存在幾十萬甚至上百萬的連接,則服務端的壓力將非常巨大,甚至可能導致崩潰。
因此,針對并發量高但請求頻率低的情況,建議使用短連接。
為了優化這種情況,可以考慮以下方法:
1. 進行連接池管理:使用連接池來管理與服務端的連接,避免每次請求都建立和關閉連接,減少資源的消耗。
2. 使用緩存機制:將一些不經常變動且占用資源較多的數據進行緩存,減少對服務端的請求,提高性能。
3. 引入負載均衡:通過負載均衡技術將請求分發到多個服務器上,均衡服務器的壓力,提高整體的處理能力。
4. 優化服務端架構:對服務端進行優化,如增加服務器的處理能力、調整服務器配置等,以提高服務端的并發處理能力。
長連接
什么是長連接
客戶端向服務器發起連接,服務器接受客戶端連接并建立雙方連接。
客戶端和服務器完成一次讀寫后,它們之間的連接不會主動關閉,并可以繼續使用該連接進行后續的讀寫操作。
長連接的生命周期
在正常情況下,一條TCP長連接建立后,只要雙方不提出關閉請求并且不出現異常情況,這條連接會一直存在。操作系統不會主動關閉它,即使在經過物理網絡拓撲的改變之后仍然可以使用。因此,一條連接可以保持幾天、幾個月、幾年甚至更長時間,只要沒有異常情況或用戶(應用層)主動關閉。
客戶端和服務端可以一直使用該連接進行數據通信。
長連接的優點
使用長連接可以減少TCP建立和關閉操作,從而減少網絡阻塞。即使發生錯誤,也不需要關閉連接就能進行提示,這樣可以減少CPU和內存的使用,因為不需要頻繁地建立和關閉連接。
長連接的缺點
連接數過多時,影響服務端的性能和并發數量。
使用場景
數據庫的連接就是采用TCP長連接.
RPC,遠程服務調用,在服務器,一個服務進程頻繁調用另一個服務進程,可使用長連接,減少連接花費的時間。
總結
1.對于長連接和短連接的使用是需要根據應用場景來判斷的
2.長連接并不是萬能的,也是需要維護的,
長連接的實現
心跳機制
應用層協議通常會使用心跳機制來保持客戶端與服務器的連接,并確保客戶端仍然在線。典型的心跳協議如IM協議(例如QQ、MSN、飛信)會定期發送數據包給服務器,同時傳輸一些可能必要的數據。
在TCP協議中,也有一個心跳機制,即TCP選項中的SO_KEEPALIVE。系統默認設置為2小時發送一次心跳包。但是這個機制無法檢測機器斷電、網線拔出或防火墻等導致的斷線情況。此外,邏輯層處理斷線情況也可能不夠完善。通常情況下,如果只是用于保活目的,SO_KEEPALIVE機制仍然是可以接受的。
請注意以下優化建議:
1. 調整心跳頻率:根據實際情況,可以根據應用需求調整心跳頻率。太頻繁的心跳包可能造成額外的網絡負擔,而太不頻繁則可能延遲檢測到斷線情況。
2. 使用應用層心跳機制:考慮使用應用層心跳機制,而不僅僅依賴于TCP的SO_KEEPALIVE。應用層心跳機制能夠更靈活地處理不同情況下的斷線問題,并能夠傳遞更多的必要數據。
3. 完善斷線處理邏輯:在應用層實現斷線處理邏輯,包括重新連接、重發未成功的數據等。確保斷線后客戶端能夠盡快恢復連接,并保持數據的完整性和一致性。
4. 測試和監控:定期測試心跳機制的有效性,并監控斷線情況以及處理效果。及時發現并解決可能存在的問題。
為什么需要心跳機制?
由于網絡的不可靠性,TCP長連接可能會在某些突發情況下斷開,例如網線被拔出或突然掉電。在這種情況下,如果服務器和客戶端之間沒有交互,它們不能立即發現對方已掉線。為解決這個問題,可以引入心跳機制。
TCP協議的KeepAlive機制
默認KeepAlive狀態是不打開的。
需要將setsockopt將SOL_SOCKET.SO_KEEPALIVE設置為1才是打開KeepAlive狀態,
并且可以設置三個參數:
tcp_keepalive_time ,tcp_keepalive_probes , tcp_keepalive_intvl,
分別表示:連接閑置多久開始發keepalive的ack包、發幾個ack包不回復才當對方已斷線、兩個ack包之間的間隔。
很多網絡設備,尤其是NAT路由器,由于其硬件的限制(例如內存、CPU處理能力),無法保持其上的所有連接,因此在必要的時候,會在連接池中選擇一些不活躍的連接踢掉。
典型做法是LRU,把最久沒有數據的連接給T掉。
通過使用TCP的KeepAlive機制(修改那個time參數),可以讓連接每隔一小段時間就產生一些ack包,以降低被踢掉的風險,當然,這樣的代價是額外的網絡和CPU負擔。
如何實現心跳機制?
兩種方式實現心跳機制:
- 使用 TCP 協議層面的 keepalive 機制.
- 在應用層上實現自定義的心跳機制.
雖然在 TCP 協議層面上, 提供了 keepalive 保活機制, 但是使用它有幾個缺點:
- 它不是 TCP 的標準協議, 并且是默認關閉的.
- TCP keepalive 機制依賴于操作系統的實現, 默認的 keepalive 心跳時間是 兩個小時, 并且對 keepalive 的修改需要系統調用(或者修改系統配置), 靈活性不夠.
- TCP keepalive 與 TCP 協議綁定, 因此如果需要更換為 UDP 協議時, keepalive 機制就失效了.
使用 TCP 層面的 keepalive 機制比自定義的應用層心跳機制節省流量,
本文的主要介紹應用層方面實現心跳機制,使用netty實現心跳和斷線重連。
netty實現心跳機制
netty對心跳機制提供了機制,實現的關鍵是IdleStateHandler先來看一下他的構造函數
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}實例化一個 IdleStateHandler 需要提供三個參數:
- readerIdleTimeSeconds, 讀超時. 即當在指定的時間間隔內沒有從 Channel 讀取到數據時, 會觸發一個 READER_IDLE 的 IdleStateEvent 事件.
- writerIdleTimeSeconds, 寫超時. 即當在指定的時間間隔內沒有數據寫入到 Channel 時, 會觸發一個 WRITER_IDLE 的 IdleStateEvent 事件.
- allIdleTimeSeconds, 讀和寫都超時. 即當在指定的時間間隔內沒有讀并且寫操作時, 會觸發一個 ALL_IDLE 的 IdleStateEvent 事件.
netty心跳流程

1. 客戶端成功連接服務端。
2.在客戶端中的ChannelPipeline中加入IdleStateHandler,設置寫事件觸發事件為5s.
3.客戶端超過5s未寫數據,觸發寫事件,向服務端發送心跳包,
4.同樣,服務端要對心跳包做出響應,其實給客戶端最好的回復就是“不回復”,減輕服務端的壓力
5.超過三次,1過0s服務端都會收到來自客戶端的心跳信息,服務端可以認為客戶端掛了,可以close鏈路。
6.客戶端恢復正常,發現鏈路已斷,重新連接服務端。
代碼實現
服務端handler:
package com.heartbreak.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.Random;
/\*\*
\* @author janti
\* @date 2018/6/10 12:21
\*/
public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {
// 失敗計數器:未收到client端發送的ping請求
private int unRecPingTimes = 0;
// 定義服務端沒有收到心跳消息的最大次數
private static final int MAX\_UN\_REC\_PING\_TIMES = 3;
private Random random = new Random(System.currentTimeMillis());
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
if (msg!=null && msg.equals("Heartbeat")){
System.out.println("客戶端"+ctx.channel().remoteAddress()+"--心跳信息--");
}else {
System.out.println("客戶端----請求消息----:"+msg);
String resp \= "商品的價格是:"+random.nextInt(1000);
ctx.writeAndFlush(resp);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event \= (IdleStateEvent) evt;
if (event.state()==IdleState.READER\_IDLE){
System.out.println("===服務端===(READER\_IDLE 讀超時)");
// 失敗計數器次數大于等于3次的時候,關閉鏈接,等待client重連
if (unRecPingTimes >= MAX\_UN\_REC\_PING\_TIMES) {
System.out.println("===服務端===(讀超時,關閉chanel)");
// 連續超過N次未收到client的ping消息,那么關閉該通道,等待client重連
ctx.close();
} else {
// 失敗計數器加1
unRecPingTimes++;
}
}else {
super.userEventTriggered(ctx,evt);
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println("一個客戶端已連接");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.out.println("一個客戶端已斷開連接");
}
}服務端server:
package com.heartbreak.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.\*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/\*\*
\* @author tangj
\* @date 2018/6/10 10:46
\*/
public class HeartBeatServer {
private static int port = 9817;
public HeartBeatServer(int port) {
this.port = port;
}
ServerBootstrap bootstrap \= null;
ChannelFuture f;
// 檢測chanel是否接受過心跳數據時間間隔(單位秒)
private static final int READ\_WAIT\_SECONDS = 10;
public static void main(String args\[\]) {
HeartBeatServer heartBeatServer \= new HeartBeatServer(port);
heartBeatServer.startServer();
}
public void startServer() {
EventLoopGroup bossgroup \= new NioEventLoopGroup();
EventLoopGroup workergroup \= new NioEventLoopGroup();
try {
bootstrap \= new ServerBootstrap();
bootstrap.group(bossgroup, workergroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HeartBeatServerInitializer());
// 服務器綁定端口監聽
f = bootstrap.bind(port).sync();
System.out.println("server start ,port: "+port);
// 監聽服務器關閉監聽,此方法會阻塞
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossgroup.shutdownGracefully();
workergroup.shutdownGracefully();
}
}
private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline \= ch.pipeline();
// 監聽讀操作,讀超時時間為5秒,超過5秒關閉channel;
pipeline.addLast("ping", new IdleStateHandler(READ\_WAIT\_SECONDS, 0, 0, TimeUnit.SECONDS));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new HeartbeatServerHandler());
}
}
}客戶端handler
package com.heartbreak.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/\*\*
\* @author tangj
\* @date 2018/6/11 22:55
\*/
public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{
private HeartBeatClient client;
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd");
private static final ByteBuf HEARTBEAT\_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
CharsetUtil.UTF\_8));
public HeartBeatClientHandler(HeartBeatClient client) {
this.client = client;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("收到服務端回復:"+msg);
if (msg.equals("Heartbeat")) {
ctx.write("has read message from server");
ctx.flush();
}
ReferenceCountUtil.release(msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state \= ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER\_IDLE) {
ctx.writeAndFlush(HEARTBEAT\_SEQUENCE.duplicate());
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.err.println("客戶端與服務端斷開連接,斷開的時間為:"+format.format(new Date()));
// 定時線程 斷線重連
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(new Runnable() {
@Override
public void run() {
client.doConncet();
}
}, 10, TimeUnit.SECONDS);
}
}客戶端啟動:
package com.heartbreak.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.\*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/\*\*
\* @author tangj
\* @date 2018/6/10 16:18
\*/
public class HeartBeatClient {
private Random random = new Random();
public Channel channel;
public Bootstrap bootstrap;
protected String host = "127.0.0.1";
protected int port = 9817;
public static void main(String args\[\]) throws Exception {
HeartBeatClient client \= new HeartBeatClient();
client.run();
client.sendData();
}
public void run() throws Exception {
EventLoopGroup group \= new NioEventLoopGroup();
try {
bootstrap \= new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new SimpleClientInitializer(HeartBeatClient.this));
doConncet();
} catch (Exception e) {
e.printStackTrace();
}
}
/\*\*
\* 發送數據
\* @throws Exception
\*/
public void sendData() throws Exception {
BufferedReader in \= new BufferedReader(new InputStreamReader(System.in));
while (true){
String cmd \= in.readLine();
switch (cmd){
case "close" :
channel.close();
break;
default:
channel.writeAndFlush(in.readLine());
break;
}
}
}
/\*\*
\* 連接服務端
\*/
public void doConncet() {
if (channel != null && channel.isActive()) {
return;
}
ChannelFuture channelFuture \= bootstrap.connect(host, port);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture futureListener) throws Exception {
if (channelFuture.isSuccess()) {
channel \= futureListener.channel();
System.out.println("connect server successfully");
} else {
System.out.println("Failed to connect to server, try connect after 10s");
futureListener.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
doConncet();
}
}, 10, TimeUnit.SECONDS);
}
}
});
}
private class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {
private HeartBeatClient client;
public SimpleClientInitializer(HeartBeatClient client) {
this.client = client;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline \= socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 5, 0));
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("handler", new HeartBeatClientHandler(client));
}
}
}運行結果:
1.客戶端長時間未發送心跳包,服務端關閉連接
server start ,port: 9817
一個客戶端已連接
\===服務端===(READER\_IDLE 讀超時)
\===服務端===(READER\_IDLE 讀超時)
\===服務端===(READER\_IDLE 讀超時)
\===服務端===(READER\_IDLE 讀超時)
\===服務端===(讀超時,關閉chanel)
一個客戶端已斷開連接2.客戶端發送心跳包,服務端和客戶端保持心跳信息
一個客戶端已連接
客戶端/127.0.0.1:55436--心跳信息--
客戶端/127.0.0.1:55436--心跳信息--
客戶端/127.0.0.1:55436--心跳信息--
客戶端/127.0.0.1:55436--心跳信息--3.服務單宕機,斷開連接,客戶端進行重連
客戶端與服務端斷開連接,斷開的時間為:2018-06-12 23:47:12
Failed to connect to server, try connect after 10s
Failed to connect to server, try connect after 10s
Failed to connect to server, try connect after 10s
connect server successfully代碼地址:

