引言
本文介绍网络IO编程的入门部分,Java 的传统BIO Socket编程源码分析,了解如何将BIO阻塞行为accept()
和 read()
改造为非阻塞行为,并且将结合Linux文档介绍其中的机制,文档中描述了如何处理Socket
的accept
,对比Java的Socket实现代码,基本可以发现和Linux行为基本一致。
废话不多说,我们直接开始。
draw.io 文件
本文涉及的个人源码分析绘图均由 draw.io
绘制,源文件如下:
链接:pan.baidu.com/s/1FHAYt4Ax…
提取码:qsmg
什么是Socket?
Socket起源于Unix的一种通信机制,中文通常叫他“套接字”,代表了网络IP和端口,可以看作是通信过程的一个“句柄”。
Socket 也可以理解为网络编程当中的API,编程语言提供了对应的API实现方式,电脑上的网络应用程序也是通过“套接字”完成网络请求接受与应答。
总而言之:Socket是应用层与TCP/IP协议族通信的中间软件抽象层。
图片来源:socket图解 · Go语言中文文档 (topgoer.com)
阻塞式IO模型
在 《UNIX Network Programming》 一书当中,用UDP传输的案例模拟了阻塞式的IO模型,这个模型的概念和Java BIO的阻塞模型类似。
下面函数中应用进程在调用 recvfrom
之后就开始系统调用并且进行阻塞,等待内核把数据准备并且复制完成之后才得到结果,或者等待过程中发生错误返回。
从图片可以看到,在内核工作的整个过程中应用进程无法做其他任何操作。
BIO 通信模型
我们把上面的阻塞IO模型转为IO通信模型,结果如下:
BIO对于每一个客户端进行阻塞等待接收连接,同一个时间只能处理一个Socket请求,并且在构建完成之后通常会分配一个Thread线程为其进行服务。
BIO 阻塞案例代码
BioClientSocket
/**
* BioClientSocket 客户端 Socket实现
* @author Xander
* @version v1.0.0
* @Package : com.zxd.interview.niosource.bio
* @Description : BioClientSocket 客户端 Socket实现
* @Create on : 2023/7/5 09:52
**/@Slf4j
public class BioClientSocket {
public void initBIOClient(String host, int port) {
BufferedReader reader = null;
BufferedWriter writer = null;
Socket socket = null;
String inputContent;
int count = 0;
try {
reader = new BufferedReader(new InputStreamReader(System.in));
socket = new Socket(host, port);
writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
log.info("clientSocket started: " + stringNowTime());
while (((inputContent = reader.readLine()) != null) && count < 2) {
inputContent = stringNowTime() + ": 第" + count + "条消息: " + inputContent + "n";
writer.write(inputContent);//将消息发送给服务端
writer.flush();
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
socket.close();
reader.close();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public String stringNowTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return format.format(new Date());
}
public static void main(String[] args) {
BioClientSocket client = new BioClientSocket();
client.initBIOClient("127.0.0.1", 8888);
}/**
运行结果:
clientSocket started: 2023-07-05 10:26:05
7978987 797898 tyuytu */}
BioServerSocket
/**
* ServerSocket 实现
* @author Xander
* @version v1.0.0
* @Package : com.zxd.interview.niosource.bio
* @Description : ServerSocket 实现
* @Create on : 2023/7/5 09:48
**/@Slf4j
public class BioServerSocket {
public void initBIOServer(int port) {
ServerSocket serverSocket = null;//服务端Socket
Socket socket = null;//客户端socket
BufferedReader reader = null;
String inputContent;
int count = 0;
try {
serverSocket = new ServerSocket(port);
log.info(stringNowTime() + ": serverSocket started");
while (true) {
socket = serverSocket.accept();
log.info(stringNowTime() + ": id为" + socket.hashCode() + "的Clientsocket connected");
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
while ((inputContent = reader.readLine()) != null) {
log.info("收到id为" + socket.hashCode() + " " + inputContent);
count++;
}
log.info("id为" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "读取结束");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if(Objects.nonNull(reader)){
reader.close();
}
if(Objects.nonNull(socket)){
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}/**
运行结果:
10:25:57.731 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - 2023-07-05 10:25:57: serverSocket started
10:26:08.442 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - 2023-07-05 10:26:08: id为161960012的Clientsocket connected
10:26:29.356 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - 收到id为161960012 2023-07-05 10:26:26: 第0条消息: 7978987
10:26:34.409 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - 收到id为161960012 2023-07-05 10:26:34: 第1条消息: 797898
10:26:38.298 [main] INFO com.zxd.interview.niosource.bio.BioServerSocket - id为161960012的Clientsocket 2023-07-05 10:26:38读取结束
*/
public String stringNowTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return format.format(new Date());
}
public static void main(String[] args) {
BioServerSocket server = new BioServerSocket();
server.initBIOServer(8888);
}
}
BIO 阻塞模型中,需要关注的代码主要是这几个:
serverSocket = new ServerSocket(port);
socket = serverSocket.accept();
socket = new Socket(host, port);
从代码中可以看出,客户端在获取Socket建立连接后,通过系统输入输出流完成读写IO操作,服务端则通过系统缓冲流Buffer来提高读写效率。
ServerSocket 中 bind 解读
在具体的解读之前,先看下整个调用的大致流程图。
由于是ServerSocket
服务端先启动,这里先对bind
操作进行解读,bind
操作是在本机的某个端口和IP地址上进行listen监听。
在bind成功之后,服务端进入accept
阻塞等待,此时客户端Socket请求此地址将会进行Socket连接绑定。
我们从ServerSocket
的初始化代码作为入口进行介绍。
//java.net.ServerSocket#ServerSocket(int)
public ServerSocket(int port) throws IOException {
this(port, 50, null);
}
public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
setImpl();
// 检查端口是否越界
// 0xFFFF = 15 * 16^3 + 15 * 16^2 + 15 * 16^1 + 15 * 16^0 = **65535**
if (port 0xFFFF)
throw new IllegalArgumentException(
"Port value out of range: " + port);
if (backlog < 1)
backlog = 50;
try {
// 核心部分
bind(new InetSocketAddress(bindAddr, port), backlog);
} catch(SecurityException e) {
close();
throw e;
} catch(IOException e) {
close();
throw e;
}
}
setImpl();
这个方法我们先暂时放到一边,我们简单扫一下其他代码。
在上面的案例代码当中,我们传入的ip
和port
都处在合法的范围内,Socket规定的端口范围是0 - 65525,超过这个范围不允许进行bind
。
上面代码的核心逻辑是bind(xxx)
这一段操作。
bind(new InetSocketAddress(bindAddr, port), backlog);
new InetSocketAddress(bindAddr, port)
在bind
方法调用之前,ServerSocket
会先构建 InetSocketAddress 对象。InetSocketAddress对象构建实际为InetSocketAddressHolder包装类。包装类的作用是可以防止IP
和Port
等敏感字段的外部篡改。
此外从代码可以看到,构建对象会对于IP
和Port
进行二次检查,如果IP地址不存在,会给一个默认值(通常是 0.0.0.0
)。
Creates a socket address from an IP address and a port number.
A valid port value is between 0 and 65535. A port number of zero will let the system pick up an ephemeral port in a bind operation.|
根据IP地址和端口号创建Socket地址。有效的端口值介于0和65535之间。端口号为0时,系统将在绑定操作中使用短暂端口。
InetSocketAddressHolder 对象构建完成之后,接着就进入到核心的bind(SocketAddress endpoint, int backlog)
内部代码。
bind(SocketAddress endpoint, int backlog)
java.net.ServerSocket#bind(java.net.SocketAddress, int)
/**
Binds the ServerSocket to a specific address (IP address and port number).
将ServerSocket绑定到一个特定的地址(IP地址和端口号)。
If the address is null, then the system will pick up an ephemeral port and a valid local address to bind the socket.
如果地址为空,那么系统会选取一个短暂的端口和一个有效的本地地址来绑定套接字。
*/
public void bind(SocketAddress endpoint, int backlog) throws IOException {
// Socket是否已经被关闭
if (isClosed())
throw new SocketException("Socket is closed");
// 判断是否已经绑定
if (!oldImpl && isBound())
throw new SocketException("Already bound");
if (endpoint == null)
// 如果地址为空,给一个默认地址
endpoint = new InetSocketAddress(0);
if (!(endpoint instanceof InetSocketAddress))
throw new IllegalArgumentException("Unsupported address type");
// 类型强转为 InetSocketAddress
InetSocketAddress epoint = (InetSocketAddress) endpoint;
// 如果地址已经被占用了
if (epoint.isUnresolved())
throw new SocketException("Unresolved address");
if (backlog < 1)
backlog = 50;
try {
SecurityManager security = System.getSecurityManager();
if (security != null)
// 端口进行安全检查
security.checkListen(epoint.getPort());
getImpl().bind(epoint.getAddress(), epoint.getPort());
getImpl().listen(backlog);
bound = true;
} catch(SecurityException e) {
bound = false;
throw e;
} catch(IOException e) {
bound = false;
throw e;
}
}
bind 方法是将 ServerSocket 绑定到一个特定的地址(IP地址和端口号), 如果地址为空,那么系统会选取一个临时端口和有效的本地地址来绑定 ServerSocket。
跳过不需要关注的校验代码,在·try
中有三行比较重要的代码。
getImpl().bind(epoint.getAddress(), epoint.getPort());
getImpl().listen(backlog);
bound = true;
这里的代码初步理解是获取一个impl
对象,绑定地址和端口,调用listen
方法传递backlog
。
backlog
这个值的作用可以看下面的地址,这里整理文章内容大致理解:
-
Linux Network Programming, Part 1 (linuxjournal.com)
-
详解socket中的backlog 参数 - 知乎 (zhihu.com)
backlog
主要是和Socket
有关。在Socket编程中listen函数的第二个参数为backlog,用于服务器编程。
listen(sock, backlog);
在TCP 三次握手当中,LISTEN 状态的服务端 Socket 收到 SYN,会建立一个 SYN_REVD 的连接,SYN_REVD 是一个半连接状态,只有在收到客户端的ACK之后才会进入ESTABLISHED,也就是说三次握手的过程必然会经历SYN_REVD和ESTABLISHED两个状态。
针对这两个状态,不同的操作系统有不同实现,在 FressBSD 中 backlog 就是描述状态为 SYN_REVD 和 ESTABLISHED 的所有连接最大数量。
在 Linux 系统当中,使用两个队列 syn queue和 accept queue,这两个队列分别存储状态为SYN_REVD和状态为ESTABLISHED的连接,Llinux2.2及以后,backlog表示accept queue的大小,而syn queue大小由 /proc/sys/net/ipv4/tcp_max_syn_backlog
配置。
可以看到backlog的值直接影响了建立连接的效率。上面代码中backlog=50
,可以认为 accept queue 的容量为 50。
listen
方法执行完成之后,此时将设置bound = true
,代码执行到此处说明Socket
绑定成功了。
现在我们回过头看getImpl().bind(epoint.getAddress(), epoint.getPort());
这块代码工作。
setImpl()
介绍getImpl()
的前提是我们要知道如何set
的,具体代码位于构造方法中一行不起眼的setImpl()
操作。
java.net.ServerSocket#setImpl。
private void setImpl() {
if (factory != null) {
impl = factory.createSocketImpl();
checkOldImpl();
} else {
// No need to do a checkOldImpl() here, we know it's an up to date
// SocketImpl!
impl = new SocksSocketImpl();
}
if (impl != null)
impl.setServerSocket(this);
}
注意,在第一次初始化的时候,SocketImplFactory
是没有被初始化过的,所以走的是else
分支,具体工作是为内部的成员变量 SocketImpl
进行初始化。
/**
* The implementation of this Socket. */
private SocketImpl impl;
SocksSocketImpl
初始化之后,将会设置它的成员变量ServerSocket
为this
引用
if (impl != null)
impl.setServerSocket(this);
这里的处理工作很简单,分别是初始化 SocksSocketImpl ,把当前对象实例的this引用传递给这个初始化的 SocksSocketImpl 的成员变量(这时候自身的引用逸出了)。
了解setImpl
之后,下面这里我们再看看 getImpl()
干了啥。
getImpl()
java.net.ServerSocket#getImpl
代码内容也比较简单,首先检查SocketImpl
是否创建,第一次连接这里为false
,此时会进入createImpl()
方法。
/**
Get the SocketImpl attached to this socket, creating it if necessary.
获取连接到此套接字的SocketImpl,如果有必要,可以创建它。
*/
SocketImpl getImpl() throws SocketException {
if (!created)
createImpl();
return impl;
}
在createImpl()
当中,通常 SocketImpl 已经在构造器初始化完成,这里直接更新 created
状态即可。
void createImpl() throws SocketException {
if (impl == null)
setImpl();
try {
impl.create(true);
created = true;
} catch (IOException e) {
throw new SocketException(e.getMessage());
}
}
setImpl()
和 getImpl()
方法配合,可以确定 SocketImpl 在使用的时候一定是被初始化完成的。
SocketImpl.bind(epoint.getAddress(), epoint.getPort())
下面再来看看它是如何进行下面两项关键操作的:
getImpl().bind(epoint.getAddress(), epoint.getPort());
getImpl().listen(backlog);
在之前的初始化代码中,InetAddress
对象初始化设置了IP
和Port
等参数,现在委托 SocketImpl执行具体bind
操作。
java.net.AbstractPlainSocketImpl#bind
bind
方法是同步的,一开始需要先获取到fdLock
锁,然后判断是否满足Socket
绑定条件,如果满足则利用钩子(NetHooks
) 对象进行前置TCP绑定。
注意,个人发现
NetHooks.beforeTcpBind(fd, address, lport);
方法发现在JDK11之中是一个空方法,而JDK8当中会有一段provider.implBeforeTcpBind(fdObj, address, port);
的调用。
protected synchronized void bind(InetAddress address, int lport)
throws IOException
{
// 获取 fdLock 锁
synchronized (fdLock) {
if (!closePending && (socket == null || !socket.isBound())) {
NetHooks.beforeTcpBind(fd, address, lport);
}
}
// 是否链接本地地址
if (address.isLinkLocalAddress()) {
address = IPAddressUtil.toScopedAddress(address);
}
// 关键
socketBind(address, lport);
// 服务端和客户端的Socket走不同的 if 判断
if (socket != null)
socket.setBound();
if (serverSocket != null)
serverSocket.setBound();
}
加锁部分和核心逻辑无太多干系,我们跳过细枝末节,看socketBind(address, lport);
这部分代码。
fdLock 锁作用:注释说明它用于在增加/减少fdUseCount时锁定。
/* lock when increment/decrementing fdUseCount */
// 在增加/减少fdUseCount时锁定
protected final Object fdLock = new Object();
PlainSocketImpl#socketBind(InetAddress address, int port)
@Override
void socketBind(InetAddress address, int port) throws IOException {
int nativefd = checkAndReturnNativeFD();
if (address == null)
throw new NullPointerException("inet address argument is null.");
// 目前IPv4地址已经分配完毕,所以优先用 IPV6的,并且不支持 IPV4
if (preferIPv4Stack && !(address instanceof Inet4Address))
throw new SocketException("Protocol family not supported");
// 核心操作
bind0(nativefd, address, port, useExclusiveBind);
// 如果是之前 InetAddress 为空默认初始化的端口为0,则重新随机分配一个端口
if (port == 0) {
localport = localPort0(nativefd);
} else {
localport = port;
}
this.address = address;
}
socketBind(address, lport);
方法调用,最后绑定操作为JVM的底层C++操作bind0
。
bind0
属于比较底层的代码,这里我们就不继续探究了,如果读者好奇,可以阅读 HotSpot
的开源实现代码。
static native void bind0(int fd, InetAddress localAddress, int localport,
boolean exclBind)
throws IOException;
从整体上看,上面这一整个bind
操作都是同步完成的,主要逻辑是先做一系列检查,之后调用底层的JVM方法完成Socket
绑定。
画图小结
笔者通过个人理解画了一幅图,主要描述了 bind
操作大致的逻辑,可以看到很多地方都和JVM
的底层C++代码打交道。
有必要说明一下,BIO毕竟是 Java1.0 出来的玩意,看源码我们要抓大放小,后续的JDK提案中,有人提出要收拾这个老古董=-=。
从图中也可以看出,要完成Socket连接构建,必须要获得文件描述符。
ServerSocket中accept解读
ServerSocket
的accpet
是如何阻塞获取连接的?
accept
方法的作用是询问操作系统是否有收到新的Socket
套接字信息,操作过程在操作系统底层调用实现上都是 同步的。
操作系统从Socket
中没有Socket
连接进来怎么办?根据Linux的accept
文档描述,以及Java注释的JavaDoc文档描述,都明确说明此时会在底层操作系统阻塞 。
java.net.ServerSocket#accept
我们从代码层面看看 accept
方法干了啥。
/**
Listens for a connection to be made to this socket and accepts it. The method blocks until a connection is made.
监听并接受与此套接字的连接。该方法会阻塞,直到有一个连接被建立。
A new Socket s is created and, if there is a security manager, the security manager's checkAccept method is called with s.getInetAddress().getHostAddress() and s.getPort() as its arguments to ensure the operation is allowed. This could result in a SecurityException.
一个新的Socket s被创建,如果有一个安全管理器,安全管理器的checkAccept方法被调用,参数是s.getInetAddress().getHostAddress()和s.getPort(),以确保该操作被允许。这可能会导致一个SecurityException。
*/
public Socket accept() throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!isBound())
throw new SocketException("Socket is not bound yet");
Socket s = new Socket((SocketImpl) null);
implAccept(s);
return s;
}
Java Doc 说明了accept()
会进行阻塞,这里疑问比较大的点可能是Socket s = new Socket((SocketImpl) null);
,这行代码为什么又要新建一个Socket
?带着疑问我们继续看implAccept(s);
方法。
java.net.ServerSocket#implAccept
/**
Subclasses of ServerSocket use this method to override accept() to return their own subclass of socket. So a FooServerSocket will typically hand this method an empty FooSocket. On return from implAccept the FooSocket will be connected to a client.
ServerSocket的子类使用这个方法来覆盖accept()(的行为),以返回他们自己的socket子类。比如一个FooServerSocket通常会将一个空的FooSocket交给这个方法。从 implAccept 返回时,FooSocket 将被连接到一个客户端。
*/
protected final void implAccept(Socket s) throws IOException {
SocketImpl si = null;
try {
// 判断新对象 Socketimpl 是否设置
if (s.impl == null)
s.setImpl();
else {
s.impl.reset();
}
// si 指向 Socket 对象的 impl
si = s.impl;
// Socket 对象的 impl 引用 暂时置空
s.impl = null;
// impl 地址和文件描述符初始化
si.address = new InetAddress();
si.fd = new FileDescriptor();
// getImpl() 获取的是 ServerSocket 的 impl,注意不是 Socket的
// 4. 调用 ServerSocket 持有的 SocksSocketImpl 对象完成底层操作系统的 accept 操作
getImpl().accept(si);
// raw fd has been set
// 原始fd已被设置
SocketCleanable.register(si.fd);
// 安全检查
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkAccept(si.getInetAddress().getHostAddress(),
si.getPort());
}
} catch (IOException e) {
// 如果出现底层IO异常,则s.impl = si;把之前临时置空的引用给重置回来
if (si != null)
si.reset();
s.impl = si;
throw e;
} catch (SecurityException e) {
if (si != null)
si.reset();
s.impl = si;
throw e;
}
// 把之前临时置空的引用给重置回来
s.impl = si;
s.postAccept();
}
代码首先进入一个if
判断,检查 new Socket
新对象的Socketimpl是否设置,如果为空则就设置,如果不为空,则reset()
重置。
毫无疑问,这里是刚刚初始化的Socket
,此时Socket.Socketimpl 肯定是没有设置的。
if (s.impl == null)
s.setImpl();
else {
s.impl.reset();
}
首次进入代码通常就是走if
分支。Socket.setImpl
这个方法和ServerSocket
的setImpl
非常像,new Socket
新对象会为自己的 SocketImpl 成员对象进行初始化。
至此,我们画图理解代码操作逻辑:
接下来是一些有点”绕“的操作,建议读者边调试边跟着图示理解:
// 1. si 指向 Socket 对象的 impl
si = s.impl;
// 2. Socket 对象的 impl引用 暂时置空
s.impl = null;
// 3. impl 地址和文件描述符初始化
si.address = new InetAddress();
si.fd = new FileDescriptor();
// getImpl() 获取的是 ServerSocket 的 impl,注意不是 Socket的
// 4. 调用 ServerSocket 持有的 SocksSocketImpl 对象完成底层操作系统的 accept 操作
getImpl().accept(si);
// ....
// 假设此时 accept 获取到连接
s.impl = si;
这里吐槽下老外这种变量命名给规则,啥
si
呀s
,a,b,c,d 的,不画图很容易绕进去。
格外强调下, getImpl()
的 impl对象和 si.impl
对象并不是同一个,这些代码内容非常像但是属于不同的类,切记不要混淆。
代码最后有必定会执行的 s.impl = si;
操作(因为之前暂时把引用“脱钩”了),如果是异常的si
还会进行额外的reset
重置。
s.impl = si;
这里回答一下之前遗留的问题,Socket s = new Socket((SocketImpl) null);
这行代码为什么又要新建一个Socket?
我们观察上面绘制的操作图,s.impl = null;
的执行,此时Socket对象和这个SocketImpl
暂时”失去关联“,这个时候确保哪怕new Socket
对象绑定失败,此时对于SocketImpl
来说根本是无感知的。
换句话说,如果失败了Socket
会完全重置,好像什么都没有发送过,而如果成功了,此时把引用“接回去”,必然得到的可用的Socket
。
这里给一个不恰当的比喻,当年诸葛亮草船借箭,如果有碰到没有借箭的船,极端一点是不是就可以直接”烧了“不要了,而如果“接”到箭自然需要回港“卸货‘”,对于吴国来说,它们只看到“成功”借到箭的船只。
执行getImpl().accept(si);
方法之后,我们在AbstractPlainSocketImpl找到accept方法。我
java.net.AbstractPlainSocketImpl#accept
/**
Accepts connections.
接受连接
*/
protected void accept(SocketImpl s) throws IOException {
acquireFD();
try {
socketAccept(s);
} finally {
releaseFD();
}
}
accept
调用acquireFD();
获取并且植入文件描述符号,加锁获取之后会把fdUseCount 的计数器值+1,表示有一个新增的Socket
连接。
加锁保证 fdUseCount 计数是线程安全的
// "Acquires" and returns the FileDescriptor for this impl
// - "获取 "并返回该植入物的文件描述符。
FileDescriptor acquireFD() {
synchronized (fdLock) {
fdUseCount++;
return fd;
}
}
java.net.PlainSocketImpl#socketAccept
不同的操作系统实现不同,这里仅以个人看到的JDK11版本源码为例。
@Override
void socketAccept(SocketImpl s) throws IOException {
int nativefd = checkAndReturnNativeFD();
if (s == null)
throw new NullPointerException("socket is null");
int newfd = -1;
InetSocketAddress[] isaa = new InetSocketAddress[1];
if (timeout