Bootstrap 是 Netty 提供的一个便利的工厂类, 我们可以通过它来完成 Netty 的客户端或服务器端的 Netty 初始化.利用BootStrap我们可以实现创建channel,把channel注册在EventLoop上,发起连接等功能.
BootStrap的类结构如下:
1. Client端启动实例
下面是个简单的客户端实例,我们用这个来分析BootStrap的整个流程.
2. group()
|
|
- 这里设置
EventLoopGroup
,是为了以后注册和handle事件做准备,EventLoopGroup
可以理解成一个线程池.在后面注册和handler事件的时候,会从EventLoopGroup
取线程处理.
3. channel()
|
|
- 这里并不是返回channel,而是返回一个channelFactory,利用工厂方法构造channel.而下面这个则是一个channelFactory,他是根据传入的Class,通过反射构造channel.1234567891011121314151617181920public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {private final Class<? extends T> clazz;public ReflectiveChannelFactory(Class<? extends T> clazz) {if (clazz == null) {throw new NullPointerException("clazz");}this.clazz = clazz;}//通过返回获取channel实例public T newChannel() {try {return clazz.getConstructor().newInstance();} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + clazz, t);}}}
4. option()
|
|
为Channel设置一些可选的性质.当value为null的时候表示删除这个option.
5. handler()
1234567public B handler(ChannelHandler handler) {if (handler == null) {throw new NullPointerException("handler");}this.handler = handler;return self();}设置handler,这里handler是用户自定义处理连接逻辑.例如编码器或者自定义的handler.通常来说我们通过
ChannelInitializer
的init
来添加handler.
6. connect()
|
|
先验证各个part是否准备好,然后再发起连接.
12345678910111213141516171819202122232425262728293031private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister(); //1final Channel channel = regFuture.channel(); //获取channelif (regFuture.isDone()) { //异步的结果返回if (!regFuture.isSuccess()) { //不成功return regFuture;}return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());} else {//异步结果还没出来,添加监听器来监听final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause(); //异步结果if (cause != null) {//注册失败promise.setFailure(cause);} else {// 注册成功了.promise.registered();// 发起连接doResolveAndConnect0(channel, remoteAddress, localAddress, promise);}}});return promise;}}上面是整个注册,连接的逻辑.下面这部分单独把注册部分拿出来.
12345678910111213141516171819202122232425final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel(); //创建实例init(channel);} catch (Throwable t) {if (channel != null) {channel.unsafe().closeForcibly();//如果到这里还没注册channel,则强制使用GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}//如果到这里还没注册channel,则强制使用GlobalEventExecutorreturn new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}//在这里异步注册ChannelChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close(); //已经注册成功了} else {channel.unsafe().closeForcibly();}}return regFuture;}上面是整个注册的逻辑,采用是异步的策略,也就是说我们可以在程序中,根据监听器的结果来判断注册是否成功.
12345678910111213141516void init(Channel channel) throws Exception {ChannelPipeline p = channel.pipeline();p.addLast(config.handler());final Map<ChannelOption<?>, Object> options = options0();synchronized (options) {setChannelOptions(channel, options, logger);}final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}}}在这里初始化channel.并向
channelPipeline
中添加handler.为channel设置option和Attribute123456789101112131415161718192021222324252627282930313233343536373839404142434445464748private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,final SocketAddress localAddress, final ChannelPromise promise) {try {//获取到该channel绑定的EventLoopfinal EventLoop eventLoop = channel.eventLoop();final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {//已经解析了,或者没有办法解析.doConnect(remoteAddress, localAddress, promise);return promise;}final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);if (resolveFuture.isDone()) { //返回异步解析的结果final Throwable resolveFailureCause = resolveFuture.cause();if (resolveFailureCause != null) {// 不能立即解析channel.close();promise.setFailure(resolveFailureCause);} else {// 成功解析,则连接doConnect(resolveFuture.getNow(), localAddress, promise);}return promise;}// 没有立刻解析,则添加监听器等待解析的结果resolveFuture.addListener(new FutureListener<SocketAddress>() {public void operationComplete(Future<SocketAddress> future) throws Exception {if (future.cause() != null) {//解析失败channel.close();promise.setFailure(future.cause());} else {// 解析成功,发起连接.doConnect(future.getNow(), localAddress, promise);}}});} catch (Throwable cause) {promise.tryFailure(cause);}return promise;}以上是异步解析地址.
12345678910111213141516171819private static void doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.final Channel channel = connectPromise.channel();channel.eventLoop().execute(new Runnable() {public void run() {//本地地址if (localAddress == null) {channel.connect(remoteAddress, connectPromise);} else {channel.connect(remoteAddress, localAddress, connectPromise);}connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}});}上面这部分是真正的异步连接服务器.
7. 总结
通过上面的叙述,我们不难看出来,BootStrap所做的3件事.无非在这过程中,多次利用异步来获取结果.
- 创建channel,并初始化
- 注册channel
- 连接到服务器