Dubbo3单端口发布多协议服务

2023年 10月 11日 47.7k 0

前言

Dubbo3开始支持在单个端口上监听多个协议的不同服务。 比如使用Triple协议启动端口复用后,可以在相同的端口上为服务增加 Dubbo协议支持,以及Qos协议支持。
这些协议的识别都是由一个统一的端口复用服务器进行处理的,可以用于服务的协议迁移,并且可以节约端口以及相关的资源,减少运维的复杂性。

NettyPortUnificationServer

开启端口复用后,Dubbo服务启动时默认会启动NettyPortUnificationServer,它在doOpen()方法里会启动ServerBootstrap,对于新连接的ChannelPipeline只添加了一个NettyPortUnificationServerHandler,用于协议检测。

protected void initChannel(SocketChannel ch) throws Exception {
    final ChannelPipeline p = ch.pipeline();
    final NettyPortUnificationServerHandler puHandler;
    // 端口统一处理器 多协议服务复用单个端口
    puHandler = new NettyPortUnificationServerHandler(getUrl(), sslContext, true, getProtocols(),
        NettyPortUnificationServer.this, NettyPortUnificationServer.this.dubboChannels,
        getSupportedUrls(), getSupportedHandlers());
    p.addLast("negotiation-protocol", puHandler);
}

因为一个端口要对外提供多协议服务,在连接没有建立前,服务端也不知道客户端到底使用哪种协议,此时也无法过多的配置ChannelPipeline,只能先添加一个协议检测的Handler,后续确认具体协议后,再针对性的配置ChannelPipeline,并且把当前协议检测的Handler移除。

NettyPortUnificationServerHandler

该处理器的主要职责是:检测客户端使用的协议、针对性的配置ChannelPipeline、移除协议检测Handler。
image.png
它继承自ByteToMessageDecoder,具备解码器的能力,它会在decode()检测协议。

绝大多数协议都会在报文的前几个字节带上一个魔数、或者发送一个魔法字符串,作为协议的标识。例如Dubbo协议的魔数是0xdabb,HTTP2协议会发送一个魔法字符串PRI * HTTP/2.0rnrnSMrnrn
Dubbo正是基于此来实现协议检测的。

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out)
    throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    if (in.readableBytes()  0xdabb
             *   triple > PRI * HTTP/2.0rnrnSMrnrn
             */
            final ProtocolDetector.Result result = protocol.detector().detect(buf);
            in.resetReaderIndex();
            switch (result) {
                case UNRECOGNIZED:
                    continue;
                case RECOGNIZED:
                    String protocolName = url.getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class)
                        .getExtensionName(protocol);
                    ChannelHandler localHandler = this.handlerMapper.getOrDefault(protocolName, handler);
                    URL localURL = this.urlMapper.getOrDefault(protocolName, url);
                    channel.setUrl(localURL);
                    NettyConfigOperator operator = new NettyConfigOperator(channel, localHandler);
                    protocol.configServerProtocolHandler(url, operator);
                    ctx.pipeline().remove(this);// 协议一旦确认 当前Handler就没用了
                case NEED_MORE_DATA:
                    return;
                default:
                    return;
            }
        }
        byte[] preface = new byte[in.readableBytes()];
        in.readBytes(preface);
        Set supported = url.getApplicationModel()
            .getExtensionLoader(WireProtocol.class)
            .getSupportedExtensions();
        LOGGER.error(INTERNAL_ERROR, "unknown error in remoting module", "", String.format("Can not recognize protocol from downstream=%s . "
                + "preface=%s protocols=%s", ctx.channel().remoteAddress(),
            Bytes.bytes2hex(preface),
            supported));
        in.clear();
        ctx.close();
    }
}

ProtocolDetector

协议检测的规则交给了org.apache.dubbo.remoting.api.ProtocolDetector接口,以Dubbo协议为例,规则是检测前2个字节是否是0xdabb

public class DubboDetector implements ProtocolDetector {
    private final ChannelBuffer Preface = new ByteBufferBackedChannelBuffer(
        ByteBuffer.wrap(new byte[]{(byte)0xda, (byte)0xbb})
    );

    @Override
    public Result detect(ChannelBuffer in) {
        int prefaceLen = Preface.readableBytes();
        int bytesRead = min(in.readableBytes(), prefaceLen);

        if (bytesRead ==0 || !ChannelBuffers.prefixEquals(in,  Preface,  bytesRead)) {
            return Result.UNRECOGNIZED;
        }
        if (bytesRead == prefaceLen) {
            return Result.RECOGNIZED;
        }

        return Result.NEED_MORE_DATA;
    }
}

Dubbo3主推的Triple协议和grpc协议都是基于HTTP2的,所以如何检测客户端使用的是HTTP2协议呢?HTTP2规定了,客户端建立连接后,首先要发送一个魔法字符串,用于确认服务端支持HTTP2,这个魔法字符串内容是PRI * HTTP/2.0rnrnSMrnrn

public class Http2ProtocolDetector implements ProtocolDetector {
    private final ChannelBuffer clientPrefaceString = new ByteBufferBackedChannelBuffer(
        Http2CodecUtil.connectionPrefaceBuf().nioBuffer());

    @Override
    public Result detect(ChannelBuffer in) {
        int prefaceLen = clientPrefaceString.readableBytes();
        int bytesRead = min(in.readableBytes(), prefaceLen);

        // If the input so far doesn't match the preface, break the connection.
        if (bytesRead == 0 || !ChannelBuffers.prefixEquals(in, clientPrefaceString, bytesRead)) {
            return Result.UNRECOGNIZED;
        }
        if (bytesRead == prefaceLen) {
            return Result.RECOGNIZED;
        }
        return Result.NEED_MORE_DATA;
    }
}

Http2CodecUtil.connectionPrefaceBuf()内容就是魔法字符串。

private static final ByteBuf CONNECTION_PREFACE =
    unreleasableBuffer(directBuffer(24)
    .writeBytes("PRI * HTTP/2.0rnrnSMrnrn".getBytes(UTF_8)))
    asReadOnly();

配置Handler

协议一旦检测确认,协议检测的Handler对于子Channel来说就没用了,会从ChannelPipeline中移除。
协议确认后,会通过WireProtocol#configServerProtocolHandler()来配置ChannelPipeline。
因此已经知道客户端使用哪种协议调用服务了,所以就可以针对性的配置编解码器、业务处理等Handler了。
以HTTP2为例,核心是需要配置Http2FrameCodec对HTTP2 Frame编解码、配置Http2MultiplexHandler以支持多路复用。

public void configServerProtocolHandler(URL url, ChannelOperator operator) {
    Configuration config = ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultApplicationModel());
    final List headFilters;
    if (filtersLoader != null) {
        headFilters = filtersLoader.getActivateExtension(url, HEADER_FILTER_KEY);
    } else {
        headFilters = Collections.emptyList();
    }
    final Http2FrameCodec codec = Http2FrameCodecBuilder.forServer()
        .gracefulShutdownTimeoutMillis(10000)
        .initialSettings(new Http2Settings().headerTableSize(
                config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, DEFAULT_SETTING_HEADER_LIST_SIZE))
            .maxConcurrentStreams(
                config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY, Integer.MAX_VALUE))
            .initialWindowSize(
                config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, DEFAULT_WINDOW_INIT_SIZE))
            .maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, DEFAULT_MAX_FRAME_SIZE))
            .maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY,
                DEFAULT_MAX_HEADER_LIST_SIZE)))
        .frameLogger(SERVER_LOGGER)
        .build();
    final Http2MultiplexHandler handler = new Http2MultiplexHandler(
        new ChannelInitializer() {
            @Override
            protected void initChannel(Channel ch) {
                final ChannelPipeline p = ch.pipeline();
                p.addLast(new TripleCommandOutBoundHandler());
                p.addLast(new TripleHttp2FrameServerHandler(frameworkModel, lookupExecutor(url),
                    headFilters));
            }
        });
    List handlers = new ArrayList();
    handlers.add(new ChannelHandlerPretender(codec));// Http2 Frame 编解码器
    handlers.add(new ChannelHandlerPretender(new TripleServerConnectionHandler()));// 处理 PING GOAWAY
    handlers.add(new ChannelHandlerPretender(handler));// 请求处理器
    handlers.add(new ChannelHandlerPretender(new TripleTailHandler()));// 避免内存泄漏
    operator.configChannelHandler(handlers);
}

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论