Netty TCP服务端打印客户端连接数量 4年前

1. 自建hadler 继承  ChannelDuplexHandler 

1.1完整代码

package com.lgdz.netty.server;

import com.codahale.metrics.ConsoleReporter; import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.jmx.JmxReporter; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong;

@Slf4j @ChannelHandler.Sharable public class MetricsHandler extends ChannelDuplexHandler {

**private** AtomicLong **totalConnectionNumber** \= **new** AtomicLong();

{

    MetricRegistry metricRegistry = **new** MetricRegistry();

    metricRegistry.register(**"totalConnectionNumber"**, **new** Gauge<Long>() {
        @Override

public Long getValue() { return totalConnectionNumber.longValue(); } });

ConsoleReporter consoleReporter = ConsoleReporter._forRegistry_(metricRegistry).build();
    consoleReporter.start(10, TimeUnit._SECONDS_);    **//10秒检测一次TCP客户端连接数**

    JmxReporter jmxReporter = JmxReporter._forRegistry_(metricRegistry).build();
    jmxReporter.start();

}

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception { totalConnectionNumber.incrementAndGet(); super.channelActive(ctx); }

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception { totalConnectionNumber.decrementAndGet(); super.channelInactive(ctx); }

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { _//_在这里可以处理硬件发送过来的数据 _// log.debug("数据对象长度:" + ((byte[]) msg).length); //java.lang.ClassCastException: java.lang.String cannot be cast to [B _ } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { super.write(ctx, msg, promise); } }

2.把 metricsHandler 添加到  pipeline   。 

2.1完整代码

package com.lgdz.netty.server;

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map;

_/** _ * @Author: _代 _ * _@Description:netty__服务器配置 _ * @Date: _Created in 10:20 2020/12/29 _ */ @Component _//_实现__ApplicationContextAware__以获得__ApplicationContext__中的所有__bean public class NettyServer implements ApplicationContextAware {

**private static final** Logger _logger_ \= LoggerFactory._getLogger_(NettyServer.**class**);
**private** Channel **channel**;
**private** EventLoopGroup **bossGroup**;
**private** EventLoopGroup **workerGroup**;
@Resource

private HelloServerInHandler helloServerInHandler;

**private** Map<String, Object> **exportServiceMap** \= **new** HashMap<String, Object>();

@Value(**"${dai.server.host}"**)
String **host**;

@Value(**"${rpcServer.ioThreadNum:5}"**)
**int** **ioThreadNum**;
_//__内核为此套接口排队的最大连接个数,对于给定的监听套接口,内核要维护两个队列,未链接队列和已连接队列大小总和最大值_

@Value("${rpcServer.backlog:1024}") int backlog;

@Value(**"${dai.server.port}"**)
**int** **port**;

_/\*\*

_ * _启动 _ * @throws _InterruptedException _ */

@PostConstruct public void start() { logger.info("begin to start rpc server"); // 主从 Reactor _多线程模式 _ bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(ioThreadNum);

MetricsHandler metricsHandler  = **new** MetricsHandler();

    ServerBootstrap serverBootstrap = **new** ServerBootstrap();
    serverBootstrap.group(**bossGroup**, **workerGroup**)

            .channel(NioServerSocketChannel.**class**)
            .option(ChannelOption._SO\_BACKLOG_, **backlog**)
            _//__注意是__childOption

_ .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() _//__真实数据最大字节数为__Integer.MAX_VALUE__,解码时自动去掉前面四个字节 _ _//io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(900) + length(176) exceeds writerIndex(1024): UnpooledUnsafeDirectByteBuf(ridx: 900, widx: 1024, cap: 1024) _ .addLast("logging", new LoggingHandler(LogLevel.INFO)) _/* .addLast(new MyCustomMessageDecoder()) _ .addLast(new MyEncode())*/ .addLast(helloServerInHandler)

_/\*       .addLast("decoder",new MyDecode())

_ .addLast("encoder",new MyEncode())*/ /* .addLast(new MyCustomMessageDecoder()) .addLast(new MyEncode())*/ .addLast("metricHandler", metricsHandler); //检测客户端连接数

}
            });

    **try** {
        **channel** \= serverBootstrap.bind(**host**,**port**).sync().channel();
    } **catch** (InterruptedException e) {
        **channel**.close();
        **return**;
    }
    _logger_.info(**"========================================================================================"**);
    _logger_.info(**"NettyRPC server listening on port "** \+ **port** \+ **" and ready for connections..."**);
    _logger_.info(**"========================================================================================"**);
}

@PreDestroy

public void stop() { logger.info("destroy server resources"); if (null == channel) { logger.error("server channel is null"); } bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); channel.closeFuture().syncUninterruptibly(); bossGroup = null; workerGroup = null; channel = null; }

_/\*\*

_ * _利用此方法获取__spring ioc__接管的所有__bean _ * @param _ctx _ * @throws _BeansException _ */ public void setApplicationContext(ApplicationContext ctx) throws BeansException { Map<String, Object> serviceMap = ctx.getBeansWithAnnotation(ServiceExporter.class); // 获取所有带有 ServiceExporter 注解的 Spring Bean logger.info("取到所有的RPC:{}", serviceMap); if (serviceMap != null && serviceMap.size() > 0) { for (Object serviceBean : serviceMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(ServiceExporter.class) .targetInterface() .getName(); logger.info("register service mapping:{}",interfaceName); exportServiceMap.put(interfaceName, serviceBean); } }else{ System.out.println("kong======================================="); } } }

3.效果

3.1

魚丸粗麵
它是表达对规律的渴望,还是对混沌的屈服?
3
发布数
2
关注者
10730
累计阅读

热门教程文档

Swift
54小节
MySQL
34小节
Python
76小节
React
18小节
10.x
88小节
广告