原创

Netty服务端启动,创建和初始化channel

下面是一个简单的Netty服务端程序,根据以下程序用于根据源码剖析。

/**
 * Echoes back any received data from a client.
 */

public final class EchoServer {

    static final int PORT = Integer.parseInt(System.getProperty("port""8007"));

    public static void main(String[] args) throws Exception {

        // 主线程组, 用于接受客户端的连接,但是不做任何处理,跟老板一样,不做事
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 从线程组, 老板线程组会把任务丢给他,让手下线程组去做任务
        EventLoopGroup workerGroup2 = new NioEventLoopGroup();

        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            // 服务端配置
            ServerBootstrap b = new ServerBootstrap();
            /**
             * 1. bind 绑定线程组
             * 2. channel 设定通讯模式为NIO, 同步非阻塞
             * 3. option 设定缓冲区大小, 缓存区的单位是字节
             * 4. handler 用来处理服务端通道的请求
             * 5. childHandler 设置过滤器(childHandler是服务的Bootstrap独有的方法。是用于提供处理对象的。
             *              可以一次性增加若干个处理逻辑。是类似责任链模式的处理方式。
             *              增加A,B两个处理逻辑,在处理客户端请求数据的时候,根据A-》B顺序依次处理)
             * 5. bind 绑定端口(ServerBootstrap可以绑定多个监听端口,多次调用bind方法即可)
             * 6. sync 开始监听逻辑(返回一个ChannelFuture,返回结果代表的是监听成功后的一个对应的未来结果。
             *                  可以使用ChannelFuture实现后续的服务器和客户端的交互。)
             */

            ChannelFuture f = b.group(bossGroup, workerGroup2)
                    .channel(NioServerSocketChannel.class) // 底层是通过NioServerSocketChannel类反射获取channel
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            //p.addLast(new LoggingHandler(LogLevel.INFO));
                            p.addLast(serverHandler);
                        }
                    })
                    .bind(PORT)
                    .sync();
            System.out.println(Thread.currentThread().getName() + ">>>>>>>>>>>>>>>>>>");
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup2.shutdownGracefully();
        }
    }
}

b.bind(PORT)跟进最终调用到AbstractBootstrap#doBind这个方法

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

这里就是服务端创建的主要代码

创建Channel

主要概括如下

1.通过demo里传入的NioServerSocketChannel类反射创建channel

2.在NioServerSocketChannel的构造函数里

  • 通过java底层的SelectorProvider创建ServerSocketChannel
  • 调用父类构造函数设置阻塞模式,创建id,unsafe,pipeline等
  • 创建NioServerSocketChannelConfig,便于进行参数配置

方法体里第一行代码的initAndRegister方法就是在创建和初始化channel

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();   //创建channel,本节分析
        init(channel);
    } catch (Throwable t) {
        //省略
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

创建channel就在channelFactory.newChannel()这行代码里,这里会调用到ReflectiveChannelFactory#newChannel这个方法

public T newChannel() {
    try {
        return constructor.newInstance();
    } catch (Throwable t) {
        //省略
    }
}

可以看到这里用反射创建一个channel,也就是说channel是通过channelFactoryconstructor来反射创建的,而channelFactory是在demo里调用channel方法时初始化的,如下

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)   //在这里初始化了`channelFactory
    //省略

AbstractBootstrap#channel方法里

public B channel(Class<? extends C> channelClass) {
    return channelFactory(new ReflectiveChannelFactory<C>(
        ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}

最终调用到AbstractBootstrap#channelFactory方法里

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    ObjectUtil.checkNotNull(channelFactory, "channelFactory");
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }

    this.channelFactory = channelFactory;
    return self();
}

既然这里是通过反射创建了一个NioServerSocketChannel,接下来看看NioServerSocketChannel这个类的构造函数

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));  
}

先看看NioServerSocketChannel#newSocket这个方法

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        return provider.openServerSocketChannel(); //provider就是DEFAULT_SELECTOR_PROVIDER,为SelectorProvider.provider()
    } catch (IOException e) {
        //省略
    }
}

上面的构造函数调用到了另一个构造函数

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

super方法调用父类做一些简单的初始化,主要在AbstractNioChannelAbstractChannel这两个类这种

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        //省略
    }
}

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

NioServerSocketChannelConfig则是传入java底层的ServerSocketChannel方便以后对做一些配置

初始化channel

先概括一下,主要做了这几件事

  • 将用户设置的options和attrs设置到channel的config里
  • 向channel的pipeline里添加用户传入的handler
  • 创建一个特殊的handler即ServerBootstrapAcceptor(用于处理新连接接入)并添加进pipeline里

接下来分析初始化channel的过程,即init(channel)这行代码

这里会调用到ServerBootstrap#init这个方法

void init(Channel channel) {
    //newOptionsArray这个方法拿到用户设置的options设置到channel的config里
    setChannelOptions(channel, newOptionsArray(), logger);
    //attrs0这个方法拿到用户设置的attrs设置到channel里
    setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    //拿到用于配置childHandler的childOptions
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
    }
    //拿到用于配置childHandler的childAttrs
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            //拿到demo里调用childHandler方法时添加的handler
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    //ServerBootstrapAcceptor是一个特殊的handler,用于处理新连接接入
                    pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}
正文到此结束
本文目录