首页 国际新闻正文

挖机视频,Netty 完成心跳机制与断线重连,下一站婚姻

心跳机制

何为心跳

所谓心跳, 即在 TCP 长衔接中, 客户端和服务器之间守时发送的王中义一种特别的数据包, 告诉对方自己还在线, 以保证 TCP 衔接的有效性.

注:心跳包还有另一个效果,经常被疏忽,即:一个衔接假如长期不必,防火墙或许路由器就会断开该衔接。


怎么完结

中心Handler —— IdleStateHandler

在 Netty 中, 完结心跳机制的关键是 IdleStateHandler, 那么这个 Handler 怎么运用呢? 先看下它的结构器:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}


这儿解说下三个参数的意义:

  • readerIdleTimeSeconds: 读超时. 即当在指定的时刻距离内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事情.
  • writerIdleTimeSeconds: 写超时. 即当在指定的时刻距离内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事情.
  • allIdleTimeSeconds: 读/写超时. 即当在指定的时刻距离内没有读或写操作时, 会触发一个A挖机视频,Netty 完结心跳机制与断线重连,下一站婚姻LL_IDLE 的 IdleStateEvent 事情.


注:这三个参数默许的时刻单位是秒。若需求指定其他时刻单位,能够运用另一个结构办法:IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)


在看下面的完结之前,主张先了解一下IdleStateHandler的完结原理。

下面直接上代码,需求留意的当地,会在代码中经过注释进行阐明。

运用IdleStateHandler完结心跳

下面将运用IdleStateHandler来完结心跳,Client端衔接到Server端后,会循环履行一个使命:随机等候几秒然后ping一下Server即发送一个心跳包。当等候的时刻超越规则时刻,将会发送失利,认为Server端在此之前现已自动断开衔接了。代码如下:

Client端

ClientIdleStateTrigger —— 心跳触发器

类ClientIdleStateTrigger也是一个Handler,仅仅重写了userEventTriggered办法,用于捕获IdleState.WRITER_IDLE事情(未在指守时刻内向服务器发送数据),然后向Server端发送一个心跳包。

/**
*


* 用于捕获{@link IdleState#WRITER_IDLE}事情(未在指守时刻内向服务器发送数据),然后向Server端发送一个心跳包。
*


*/
public class ClientIdleStateTrigger extends ChannelInboundHandlerAdapter {
public static final String HEART_BEAT = "heart beat!";
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER_IDLE) {
// write heartbeat to server
ctx.write霉组词AndFlush(HEART_BEAT);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}

Pinger —— 心跳发射器

/**
*

客户端衔接到服务器端后,会循环履行一个使命:随机等候几秒,然后ping一下Server端,即发送一个心跳包。


*/
public class Pinger extends ChannelInboundHandlerAdapter {
private Random random = new Random();
private int baseRandom = 8;
private Channel channel;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.channel = ctx.channel();
ping(ctx.channel());
}
private void ping(Channel channel) {
int second = Math.max(1, ra王梓一ndom.nextInt(baseRandom));
System.out.println("next heart beat will send after " + second + "s.");
ScheduledFuture
@Override
public void run() {
if (channel.isActive()) {
System.out.println("sending heart beat to the server...");
channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
} else {
System.err.println("The connection had broken, cancel the task that will send a heart beat.");
channel.closeFuture();
throw new RuntimeException();
}
}
}, second, TimeUnit.SECONDS);
future.addListener(new GenericFutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess()) {
ping(channel);
}
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 当Channel现已断开的状况下, 依然发送数据, 会抛反常, 该办法会被调用.
cause.printStackTrace();
ctx.close();
}
}


ClientHandlersInitializer —— 客户端处理器调集的初始化类

public class ClientHandlersInitializer extends ChannelInitializer {
private ReconnectHandler reconnectHandler;
private EchoHandler echoHandler;
public ClientHandlersInitializer(TcpClient tcpClient) {
Assert.notNull(tcpClient, "TcpClient can not be null.");
this.reconnectHandler = new ReconnectHandler(tcpClient);
this.echoHandler = new EchoHandler();
}
@Override
protected void initChannel(SocketChannel ch) throws Exce魅惑冷情令郎ption {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new Pinger());
}
}


注: 上面的Handler调集,除了Pinger,其他都是编解码器和处理粘包,能够疏忽。


TcpClient —— TCP衔接的客户端

public class TcpClient {
private String host;
private int port;
private Bootstrap bootstrap;
/** 将Channel保存起来, 可用于在其他非handler的当地发送数据 */
private Channel channel;
public TcpClient(String host, int port) {
this(host, port, new ExponentialBackOffRetry(1000, Integer.MAX_VALUE, 60 * 1000));
}
public TcpClient(String host, int port, RetryPolicy retryPolicy) {
this.host = host;
this.port = port;
init();
}
/**
* 向长途TCP服务器恳求衔接
*/
public void connect() {
synchronized (bootstrap) {
ChannelFuture future = bootstrap.connect(host, port);
this.channel = future.channel();
}
}
private void init() {
EventLoopGroup group = new NioEventLoopGroup();
// bootstrap 可重用, 只需在TcpClient实例化的时分初始化即可.
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ClientHandlersInitializer(TcpClient.this));
}
public static void main(String[] args) {
TcpClient tcpClient = new TcpClient("localhost", 222别拿班花不妥干部2);
tcpClient.connect();
}
}


Server端

ServerIdleStateTrigger —— 断连触发器

/**
*

在规则时刻内未收到客户端的任何数据包, 将自动断开该衔接


*/
public class ServerIdleStateTrigger extends ChannelInboundHandlerAdapter {
@Override
陆曼薄靳南public void userEventTriggered(ChannelHandlerContext ct人权律师x, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.READER_IDLE) {
// 在规则时刻内没有收到客户端的上行数据, 自动断开衔接
ctx.disconnect();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}


ServerBizHandler —— 服务器端的事务处理器

/**
*

收到来自客户端的数据包后, 直接在操控台打印出来.


*/
@ChannelHandler.Sharable
public挖机视频,Netty 完结心跳机制与断线重连,下一站婚姻 class ServerBizHandler extends SimpleChannelInboundHandler {
private final String REC_HEART_BEAT = "I had received the heart beat!";
@Override
protected void channelRead0(ChannelHandlerContext ctx, String data) throws Exception {
try {
System.out.println("receive data: " + data);
// ctx.writeAndFlush(REC_HEART_BEAT);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Established connection with the remote client.");
// do something
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Disconnected with the remote client.");
// do something
ctx.fireChannelInactive();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}


ServerHandlerInitializer —— 服务器端处理器调集的初始化类

/**
*

用于初始化服务器端涉及到的一切Handler


*/
pu疏狂君莫笑blic class ServerHandlerInitializer extends ChannelInitializer {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(5, 0, 0));
ch.pipeline().addLast("idleStateTrigger", new ServerIdleStateTrigger());
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(4));
ch.pipeline().addLast("decoder", new StringDecoder());
ch.pi下运河风情peline().addLast("encoder", new StringEncoder());
ch.pipeline().addLast("bizHandler", new ServerBizHandler());
}
}


注:new IdleStateHan挖机视频,Netty 完结心跳机制与断线重连,下一站婚姻dler(5, 0, 0)该handler代表假如在5秒内没有收到来自客户端的任何数据包(包含但不限于心跳包),将会自动断开与该客户端的衔接。


TcpServer —— 服务器端

public class TcpServer {
private int port;
private ServerHandlerInitializer serverHandlerInitializer;
public TcpServer(int port) {
this.port = port;
this.serverHandlerInitializer = new ServerHandlerInitializer();
}
public void start() {
EventLoopGroup b北方民族大学图书馆ossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(this.serverHandlerInitializer);
// 绑定端口,开端接纳进来的衔接
ChannelFuture future = bootstrap.bind(port).sync();
System.out.p陈忠铨rintln("Server start listen at " + port);
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
e.printStackTrace()eynak;
}
}
public static void main(String[] args) throws Exception {
int port = 2222;
new TcpServer(port).start();
}
}


至此,一切代码现已编写结束。

测验

首要发动客户端,再发动服务器端。发动完结后,在客户端的操控台上,能够看到打印如下相似日志:


客户端操控台输出的日志


在服务器端能够看到操控台输出了相似如下的日志:


服务器端操控台输出的日志

能够看到,客户端在发送4个心跳包后,第5个包由于等候时刻较长,比及真实发送的时分,发现衔接已断开了;而服务器端收到客户端的4个心跳数据包后,迟迟等不到下一个数据包,所以决断断开该衔接。

反常状况

在测验过程中,有或许会呈现如下状况:


反常状况


呈现这种状况的原因是:在衔接已断开的状况下,依然向服务器端发送心跳包。虽然在发送心跳包之前会运用channel.isActive()判别衔接是否可用,但也有或许上一刻判别成果为可用,但下一刻发送数据包之前,衔接就断了。

现在没有找到高雅处理这种状况的计划,各位看官假如有好的处理计划,还望不吝赐教。感谢!!!

断线重连

断线重连这儿就不过多介绍,信任各位都知道是怎么回事。这儿只说大致思路,然后直接上代码。

完结思路

客户端在监测到与服务器端的衔接断开后,或许一开端就无法衔接的状况下,运用指定的重连战略进行重连操作,直到从头树立衔接或重试次数耗尽。

关于怎么监测衔接是否断开,则是经过重写ChannelInboundHandler#channelInactive来完结,但衔接不可用,该办法会被触发,所以只需求在该办法做好重连作业即可。

代码完结

注:以下代码都是在上面心跳机制的基础上修正/添加的。


由于断线重连是客户端的作业,所以只需对客户端代码进行修正。

重试战略

RetryPolicy —— 重试战略接口

public inter恐龙列车国语版全集face RetryPolicy {
/**
* Called when an operation has failed for some reason. This method should return
* true to make another attempt.
*
* @param retryCount the number of times retried so far (0 the first time)
* @return true/false
*/
boolean allowRetry(int retryCount);
/**
* get sleep time in ms of current retry count.
*
* @param retryCount current retry count
* @return the time to sleep
*/
long getSleepTimeMs(int retryCount);
}


ExponentialBackOffRetry —— 重连战略的默许完结

/**
*

Retry policy that retries a set number of times with increasing sleep time between retries


*/
public class ExponentialBackOffRetry implements RetryPolicy {
private static final int MAX_RETRIES_LIMIT = 29;
private static final int DEFAULT_MAX_SLEEP_MS = Integer.MAX_VALUE;
private final Random random = new Random();
private final long baseSleepTimeMs;
private final int maxRetries;
private final int maxSleepMs;
public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries) {
this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS);
}
public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) {
this.maxRetries = maxRetries;
this.baseSleepTimeMs = baseSleepTimeMs;
this.maxSleepMs = maxSleepMs;
}
@Override
public boolean allowRetry(int retryCount) {
if (retryCount < maxRetries) {
return true;
}
return false;
}
@Override
public long getSleepTimeMs(int retryCount) {
if (retryCount < 0) {
throw new IllegalArgumentException("retries count must greater than 0.");
}
if (retryCount > MAX_RETRIES_LIMIT) {
System.out.println(String.format("maxRetries too large (%d). Pinning to %d", maxRetries, MAX_RETRIES_LIMIT));
retryCount = MAX_RETRIES_LIMIT;
}
long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << retryCount));
if (sleepMs > maxSleepMs) {
System.out.pri挖机视频,Netty 完结心跳机制与断线重连,下一站婚姻ntln(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
sleepMs = maxSleepMs;
}
return sleepMs;
}
}


ReconnectHandler—— 重连处理器

@ChannelHandler.Sharable
public class ReconnectHandler extends ChannelInboundHandlerAdapter {
private int retries = 0;
private RetryPolicy retryPolicy;
private TcpClient tc班宇浩微博pClient;
public ReconnectHandler(TcpClient tcpClient) {
this.tcpClient = tcpClient;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Successfully established a connection to the server.");
retries = 0;
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (retries == 0) {
System.err.println("Lost the TCP connection with the server.");
ctx.close();
}挖机视频,Netty 完结心跳机制与断线重连,下一站婚姻
boolean allowRetry = getRetryPolicy().allowRetry(retries);
if (allowRetry) {
long sleepTimeMs = getRetryPolicy().getSleepTimeMs(retries);
System.out.println(String.format("Try to reconnect to the server after %dms. Retry count: %d.", sleepTimeMs, ++retries));
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(() -> {
System.out.println("Reconnecting ...");
tcpClient.connect();
}, sleepTimeMs, TimeUnit.MILLISECONDS);
}
ctx.fireChannelInactive();
}
private RetryPol袁咏珊icy getRetryPolicy() {
if (this.retryPolicy == null) {
this.retryPolicy = tcpClient.getRetryPolicy();
}
return this.retryPolicy;
}
}


ClientHandlersInitializer

在之前的基础上,添加了重连处理器ReconnectHandler。

public class ClientHandlersInitia挖机视频,Netty 完结心跳机制与断线重连,下一站婚姻lizer extends ChannelInitializer {
private ReconnectHandler reconnectHandler;
private EchoHandler echoHandler;
public ClientHandlersInitializer(TcpClient tcpClient) {
Assert.notNull(tcpClient, "TcpClient can not be null.");
this.reconnectHandler = new ReconnectHandler(tcpClient);
this.echoHandler = new EchoHandler();
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(this.reconnectHandler);
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new Pinger());
}
}


TcpClient

在之前的基础上添加剧连、重连战略的支撑。

public class TcpClient {
private String host;
private int port;
private Bootstrap bootstrap;
/** 重连战略 */
private RetryPolicy retryPolicy;
/** 将Channel保存起来, 可用于在其他非handler的当地发送数据 */
private Channel channel;
public TcpClient(String host, i男尸吧nt port) {
this(host, port, new ExponentialBackOffRetry(1000, Integer.MAX_VALUE, 60 * 1000));
}
public TcpClient(String host, int port, RetryPolicy retryPolicy) {
this.host = host;
this.port = port;
this.retryPolicy = retryPolicy;
init();
}
/**
* 向长途TC挖机视频,Netty 完结心跳机制与断线重连,下一站婚姻P服务器恳求衔接
*/
public void connect() {
synchronized (bootstrap) {
ChannelFuture future = bootstrap.connect(host, port);
future.addListener(getConnectionListener());
this.channel = future.channel();
}
}
public RetryPolicy getRetryPolicy() {
return retryPolicy;
}
private void init() {
EventLoopGroup group = new NioEventLoopGroup();
// bootstrap 可重用, 只需在TcpClient实例化的时分初始化即可.
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ClientHandlersInitializer(TcpClient.this));
}
private ChannelFutureListener getConnectionListener() {
return new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
future.channel().pipeline().fireChannelInactive();
}
}
};
}
public static void main(String[] args) {
TcpClient tcpClient = new TcpClient("localhost", 2222);
tcpClient.connect();
}
}


测验

在测验之前,为了避开 Connection reset by peer 反常,能够略微修正Pinger的ping()办法,添加if (second == 5)的条件判别。如下:

private void ping(Channel channel) {
int second = Math.max(1, random.nextInt(baseRandom));
if (second == 5) {
second = 6;
}
System.out.println("next heart beat will send after " + second + "s.");
ScheduledFuture
@Override
public void run() {
if (channel.isActive()) {
System.out.println("sending heart beat to the server...");
channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
} else {
System.err.println("The conne官窥笔趣阁ction had broken, cancel the task that will send a heart beat.");
channel.closeFuture();
throw new RuntimeException();
}
}
}, second, TimeUnit.SECONDS);
future.addListener(new GenericFutureListener() {
@Override
public void operationComplete(Fu郎咸平六任妻子相片ture future) throws Exception {
if (future.isSuccess()) {
ping(channel);
}
}
});
}

发动客户端

先只发动客户端,调查操控台输出,能够看到相似如下日志:


断线重连测验——客户端操控台输出


能够看到,当客户端发现无法衔接到服务器端,所以一向测验重连。跟着重试次数添加,重试时刻距离越大,但又不想无限增大下去,所以需求定一个阈值,比方60s。如上snowfallkeypress图所示,当下一次重试时刻超越60s时,会打印Sleep extension too large(*). Pinning to 60000,单位为ms。呈现这句话的意思是,计算出来的时刻超越阈值(60s),所以把真实睡觉的时刻重置为阈值(60s)。

发动服务器端

接着发动服务器端,然后持续调查客户端操控台输出。


断线重连测验——服务器端发动后客户端操控台输出


能够看到,在第9次重试失利后,第10次重试之前,发动的服务器,所以第10次重连的成果为Successfully established a connection to the server.,即成功衔接到服务器。接下来由于仍是不守时ping服务器,所以呈现断线重连、断线重连的循环。

扩展

在不同环境,或许会有不同的重连需求。有不同的重连需求的,只需自己完结RetryPolicy接口,然后在创立TcpClient的时分掩盖默许的重连战略即可。

版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。