分类目录归档:socket

zmq、czmq及其他相关动态库的编译和生成

zeromq是非常优秀的开源库,但是由于作者感觉visual studio的编译工具维护起来相当费事,当然也可能是作者更多的是在非windows下工作的原因吧,msvc build不再维护了,这导致了很多在windows下的用户编译zmq非常不方便。尝试了一下,libzmq\builds\deprecated-msvc\vs20xx的工具的确很多已经不工作了。
下面记录一下通过cmake编译windows下的工程的方法及步骤。
我在项目中用到了zmq、czmq、zyre,由于后两个项目都要依赖前一个项目,所以我们需要统一管理起来。
1、建立一个zeromq_v4.3.1目录(我编译的是4.3.1,所以最好带上版本号,方便后期维护);

2、克隆代码
cmd进入此目录,执行:

https://github.com/zeromq/libzmq.git

将libzmq克隆到zeromq_v4.3.1目录下;

执行:

https://github.com/zeromq/czmq.git

将czmq克隆到zeromq_v4.3.1目录下;

执行:

https://github.com/jedisct1/libsodium.git

将libsodium克隆到zeromq_v4.3.1目录下。
libsodium是一个加密库,czmq将要依赖它,所以这里预先准备好。
执行:

https://github.com/zeromq/zyre.git

将zyre克隆到zeromq_v4.3.1目录下。

3、cmake配置zmq库
cd进入libzmq目录,执行:

cmake -H. -Bbuild -G"Visual Studio 14 2015 Win64"

通常cmake都能成功,这就在build目录下生成了一个vs2015的编译工程,名称是ZeroMQ.sln

需要注意的是,ZeroMQ.sln生成的zmq文件名是带版本号的,类似libzmq-mt-gd-4_3_1.dll这样的格式。这会导致我们项目引用配置的频繁更改,也不利于通过dll替换的方式升级zmq,所以把zmq的CMakeLists.txt配置改一下,让他按libzmq.dll文件名的方式生成:

if(MSVC)
# Suppress linker warnings caused by #ifdef omission
# of file content.
set(CMAKE_STATIC_LINKER_FLAGS “${CMAKE_STATIC_LINKER_FLAGS} /ignore:4221”)
set(PDB_OUTPUT_DIRECTORY “${CMAKE_CURRENT_BINARY_DIR}/bin”)
set(PDB_NAME “libzmq${MSVC_TOOLSET}-mt-gd-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}”)
function(enable_vs_guideline_checker target)
set_target_properties(${target} PROPERTIES
VS_GLOBAL_EnableCppCoreCheck true
VS_GLOBAL_CodeAnalysisRuleSet CppCoreCheckRules.ruleset
VS_GLOBAL_RunCodeAnalysis true)
endfunction()
if(BUILD_SHARED)
add_library(libzmq SHARED ${sources} ${public_headers} ${html-docs} ${readme-docs} ${CMAKE_CURRENT_BINARY_DIR}/NSIS.template.in ${CMAKE_CURRENT_BINARY_DIR}/version.rc)
if(ENABLE_ANALYSIS)
enable_vs_guideline_checker(libzmq)
endif()
set_target_properties(libzmq PROPERTIES
PUBLIC_HEADER “${public_headers}”
#RELEASE_POSTFIX “${MSVC_TOOLSET}-mt-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}”
#RELWITHDEBINFO_POSTFIX “${MSVC_TOOLSET}-mt-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}”
#MINSIZEREL_POSTFIX “${MSVC_TOOLSET}-mt-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}”
#DEBUG_POSTFIX “${MSVC_TOOLSET}-mt-gd-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}”
RUNTIME_OUTPUT_DIRECTORY “${CMAKE_RUNTIME_OUTPUT_DIRECTORY}”
COMPILE_DEFINITIONS “DLL_EXPORT”
OUTPUT_NAME “libzmq”)
endif()

if(BUILD_STATIC)
add_library(libzmq-static STATIC ${sources} ${CMAKE_CURRENT_BINARY_DIR}/version.rc)
set_target_properties(libzmq-static PROPERTIES
PUBLIC_HEADER “${public_headers}”
#RELEASE_POSTFIX “${MSVC_TOOLSET}-mt-s-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}”
#RELWITHDEBINFO_POSTFIX “${MSVC_TOOLSET}-mt-s-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}”
#MINSIZEREL_POSTFIX “${MSVC_TOOLSET}-mt-s-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}”
#DEBUG_POSTFIX “${MSVC_TOOLSET}-mt-sgd-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}”
COMPILE_FLAGS “/DZMQ_STATIC”
OUTPUT_NAME “libzmq”)
endif()
else()

……

就是把一些’POSTFIX’注释掉了,让他不要自动在文件名后串接版本号和编译器等信息。

当然还有个方法就是手动修改项目属性,直接在visual studio中,在libzmq工程上mouse right button -> Properties,在弹出来的属性框中,手动把 Configuration Properties -> General -> Target Name 改成 “libzmq”,记得把需要的configuration都改一下(Debug、Release、RelWithDebInfo…)。另外还得把lib文件名也改了:
Configuration Properties -> Linker -> Advanced -> Import Library 改成 “xxxxxxx/libzmq.lib”
同样,如果需要pdb文件,把pdb文件名也改了:
Configuration Properties -> Linker -> Debugging -> Generate Program Database File 改成 “xxxxxxx/libzmq.pdb”。

4、生成zmq库
编译ZeroMQ.sln,一般都能正确编译的。

5、cmake配置czmq库
czmq也抛弃了windows,所以也得自己配置。
czmq配置相对麻烦一些,因为他要依赖zmq,所以我们需要改一下czmq目录下的Findlibzmq.cmake文件,让cmake能正确找到我们刚才编译的zmq的lib文件。

################################################################################
#  THIS FILE IS 100% GENERATED BY ZPROJECT; DO NOT EDIT EXCEPT EXPERIMENTALLY  #
#  Read the zproject/README.md for information about making permanent changes. #
################################################################################

if (NOT MSVC)
    include(FindPkgConfig)
    pkg_check_modules(PC_LIBZMQ "libzmq")
    if (PC_LIBZMQ_FOUND)
        # add CFLAGS from pkg-config file, e.g. draft api.
        add_definitions(${PC_LIBZMQ_CFLAGS} ${PC_LIBZMQ_CFLAGS_OTHER})
        # some libraries install the headers is a subdirectory of the include dir
        # returned by pkg-config, so use a wildcard match to improve chances of finding
        # headers and SOs.
        set(PC_LIBZMQ_INCLUDE_HINTS ${PC_LIBZMQ_INCLUDE_DIRS} ${PC_LIBZMQ_INCLUDE_DIRS}/*)
        set(PC_LIBZMQ_LIBRARY_HINTS ${PC_LIBZMQ_LIBRARY_DIRS} ${PC_LIBZMQ_LIBRARY_DIRS}/*)
    endif(PC_LIBZMQ_FOUND)
else()
	set(PC_LIBZMQ_INCLUDE_DIRS ../libzmq/include)
	set(PC_LIBZMQ_LIBRARY_DIRS ../libzmq/build/lib/Release)	
	set(PC_LIBZMQ_INCLUDE_HINTS ${PC_LIBZMQ_INCLUDE_DIRS} ${PC_LIBZMQ_INCLUDE_DIRS}/*)
	set(PC_LIBZMQ_LIBRARY_HINTS ${PC_LIBZMQ_LIBRARY_DIRS} ${PC_LIBZMQ_LIBRARY_DIRS}/*)
endif (NOT MSVC)

message("######## ${PC_LIBZMQ_INCLUDE_HINTS}")
message("######## ${PC_LIBZMQ_LIBRARY_HINTS}")

find_path (
    LIBZMQ_INCLUDE_DIRS
    NAMES zmq.h
    HINTS ${PC_LIBZMQ_INCLUDE_HINTS}
)

find_library (
    LIBZMQ_LIBRARIES
    NAMES libzmq
    HINTS ${PC_LIBZMQ_LIBRARY_HINTS}
)

include(FindPackageHandleStandardArgs)

find_package_handle_standard_args(
    LIBZMQ
    REQUIRED_VARS LIBZMQ_LIBRARIES LIBZMQ_INCLUDE_DIRS
)
mark_as_advanced(
    LIBZMQ_FOUND
    LIBZMQ_LIBRARIES LIBZMQ_INCLUDE_DIRS
)

################################################################################
#  THIS FILE IS 100% GENERATED BY ZPROJECT; DO NOT EDIT EXCEPT EXPERIMENTALLY  #
#  Read the zproject/README.md for information about making permanent changes. #
################################################################################

czmq依赖的另一个库是libsodium,还好,libsodium的vs编译还在,打开libsodium\builds\msvc\vs2015目录下的libsodium.sln文件,可直接生成对应的libsodium.dll和libsodium.lib
然后在czmq目录下找到Findlibsodium.cmake文件,修改如下:

################################################################################
#  THIS FILE IS 100% GENERATED BY ZPROJECT; DO NOT EDIT EXCEPT EXPERIMENTALLY  #
#  Please refer to the README for information about making permanent changes.  #
################################################################################

if (NOT MSVC)
    include(FindPkgConfig)
    pkg_check_modules(PC_LIBSODIUM "libsodium")
    if (NOT PC_LIBSODIUM_FOUND)
        pkg_check_modules(PC_LIBSODIUM "libsodium")
    endif (NOT PC_LIBSODIUM_FOUND)
    if (PC_LIBSODIUM_FOUND)
        # some libraries install the headers is a subdirectory of the include dir
        # returned by pkg-config, so use a wildcard match to improve chances of finding
        # headers and SOs.
        set(PC_LIBSODIUM_INCLUDE_HINTS ${PC_LIBSODIUM_INCLUDE_DIRS} ${PC_LIBSODIUM_INCLUDE_DIRS}/*)
        set(PC_LIBSODIUM_LIBRARY_HINTS ${PC_LIBSODIUM_LIBRARY_DIRS} ${PC_LIBSODIUM_LIBRARY_DIRS}/*)
    endif(PC_LIBSODIUM_FOUND)
else()
	set(PC_LIBSODIUM_INCLUDE_DIRS ${CMAKE_CURRENT_SOURCE_DIR}/../libsodium/src/libsodium/include)
	set(PC_LIBSODIUM_LIBRARY_DIRS ${CMAKE_CURRENT_SOURCE_DIR}/../libsodium/bin/x64/Release/v140/dynamic)	
	set(PC_LIBSODIUM_INCLUDE_HINTS ${PC_LIBSODIUM_INCLUDE_DIRS} ${PC_LIBSODIUM_INCLUDE_DIRS}/*)
	set(PC_LIBSODIUM_LIBRARY_HINTS ${PC_LIBSODIUM_LIBRARY_DIRS} ${PC_LIBSODIUM_LIBRARY_DIRS}/*)
endif (NOT MSVC)

find_path (
    LIBSODIUM_INCLUDE_DIRS
    NAMES sodium.h
    HINTS ${PC_LIBSODIUM_INCLUDE_HINTS}
)

find_library (
    LIBSODIUM_LIBRARIES
    NAMES libsodium
    HINTS ${PC_LIBSODIUM_LIBRARY_HINTS}
)

include(FindPackageHandleStandardArgs)

find_package_handle_standard_args(
    LIBSODIUM
    REQUIRED_VARS LIBSODIUM_LIBRARIES LIBSODIUM_INCLUDE_DIRS
)
mark_as_advanced(
    LIBSODIUM_FOUND
    LIBSODIUM_LIBRARIES LIBSODIUM_INCLUDE_DIRS
)

################################################################################
#  THIS FILE IS 100% GENERATED BY ZPROJECT; DO NOT EDIT EXCEPT EXPERIMENTALLY  #
#  Please refer to the README for information about making permanent changes.  #
################################################################################

6、生成czmq库
这个就简单了,打开编译就行啦

7、zyre依赖zmq和czmq,参考czmq,把对应的依赖项Findlibzmq.cmake、Findczmq.cmake、Findlibsodium.cmake改为对应的目录和文件就行了。

8、czmq的版本号问题
czmq通过cmake编译完的windows dll是不带版本号的,也就是从dll文件的属性里看不到版本号信息,这让我们在使用时很难确定其版本号,还好builds/msvc目录下有个resource.rc文件,只要在如下地方把这个rc文件加进去就可以了:

# shared
if (CZMQ_BUILD_SHARED)
  IF (MSVC)
    add_library(czmq SHARED ${czmq_sources} ${CMAKE_CURRENT_SOURCE_DIR}/builds/msvc/resource.rc)
  ELSE (MSVC)
    add_library(czmq SHARED
 
lt;TARGET_OBJECTS:czmq_objects>) ENDIF (MSVC) set_target_properties (czmq PROPERTIES PUBLIC_HEADER "${public_headers}" DEFINE_SYMBOL "CZMQ_EXPORTS" SOVERSION "4" VERSION "${CZMQ_VERSION}" COMPILE_DEFINITIONS "DLL_EXPORT" OUTPUT_NAME "czmq" PREFIX "lib" ) target_link_libraries(czmq PUBLIC ${MORE_LIBRARIES} ) install(TARGETS czmq EXPORT czmq-targets LIBRARY DESTINATION "lib${LIB_SUFFIX}" # .so file ARCHIVE DESTINATION "lib${LIB_SUFFIX}" # .lib file RUNTIME DESTINATION bin # .dll file ) target_include_directories(czmq PUBLIC
 
lt;BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
 
lt;INSTALL_INTERFACE:include> ) endif() # static if (CZMQ_BUILD_STATIC) IF (MSVC) add_library(czmq-static STATIC ${czmq_sources} ${CMAKE_CURRENT_SOURCE_DIR}/builds/msvc/resource.rc) ELSE (MSVC) add_library(czmq-static STATIC
 
lt;TARGET_OBJECTS:czmq_objects>) ENDIF (MSVC) set_target_properties(czmq-static PROPERTIES PUBLIC_HEADER "${public_headers}" COMPILE_DEFINITIONS "CZMQ_STATIC" OUTPUT_NAME "czmq" PREFIX "lib" ) target_link_libraries(czmq-static PUBLIC ${MORE_LIBRARIES} ) install(TARGETS czmq-static EXPORT czmq-targets LIBRARY DESTINATION "lib${LIB_SUFFIX}" # .so file ARCHIVE DESTINATION "lib${LIB_SUFFIX}" # .lib file RUNTIME DESTINATION bin # .dll file ) target_include_directories(czmq-static PUBLIC
 
lt;BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
 
lt;INSTALL_INTERFACE:include> ) target_compile_definitions(czmq-static PUBLIC CZMQ_STATIC ) endif()

Android SDK DDMS通过Wifi接收Android日志

将Android手机与PC通过USB连接,我们就可以在Android Device Monitor中看到Android手机的实时日志,这个对程序的调试非常有帮助。但是一般来讲手机只有一个usb口,如果usb被别的设备占用,我们就没法获取日志了。

实际上,Android Device Monitor是通过手机端的ADB服务与PC通讯来显示日志的。adb非常流弊,指令丰富,功能齐全,是黑进手机的必备工具。这里就用到了adb的网络功能。要想拿到日志,必须在手机端启动adb的tcp服务,然后在PC端通过adb与手机的adb服务连接,这样我们就可以通过网络获取日志了,而不是usb有线方式获取日志。

具体步骤如下:

step 1. 将手机通过usb连接到PC机

step 2. 通过adb的tcpip命令启动一个tcp监听服务

step 3. 通过adb的connect指令从PC端连接Android手机

step 4. 启动Dalvik Debug Monitor,就会看到日志从网络上发动到了本机

 

具体操作如下:

D:\Android>adb tcpip 8630
* daemon not running. starting it now on port 5037 *
* daemon started successfully *
restarting in TCP mode port: 8630

D:\Android>adb connect 192.168.3.42:8630
unable to connect to 192.168.3.42:8630

 

 

D:\Android>adb devices
List of devices attached
90a16a93 device
D:\Android>adb connect 192.168.3.42:8630
unable to connect to 192.168.3.42:8630

D:\Android>adb tcpip 9999
restarting in TCP mode port: 9999

D:\Android>adb connect 192.168.3.42:9999
connected to 192.168.3.42:9999

 

参考:http://stackoverflow.com/questions/2604727/how-can-i-connect-to-android-with-adb-over-tcp

 

svn、bugzilla等穿过防火墙访问:如何在 Windows 防火墙中打开端口

如果 Windows 防火墙阻止某一程序,而您希望允许该程序通过防火墙进行通信,通常可以通过在 Windows 防火墙允许的程序列表(也称为“例外列表”)中选中该程序来实现。若要了解如何进行此操作,请参阅允许程序通过 Windows 防火墙进行通信

但是,如果没有列出该程序,则可能需要打开一个端口。例如,当您与朋友联机进行多人游戏时,可能需要为该游戏打开一个端口,这样防火墙才能允许游戏信息到达您的计算机。端口始终保持打开状态,因此请确保关闭不需要打开的端口。

  1. 通过单击「开始」按钮 「开始」按钮的图片,然后单击“控制面板”,打开“Windows 防火墙”。 在搜索框中,键入防火墙,然后单击“Windows 防火墙”

  2. 在左窗格中,单击“高级设置” 需要管理员权限 如果系统提示您输入管理员密码或进行确认,请键入该密码或提供确认。

  3. “高级安全 Windows 防火墙”对话框的左窗格中,单击“入站规则”,然后在右窗格中,单击“新建规则”

  4. 按照新建入站规则向导中的说明进行操作。

如果您无法通过 Windows 防火墙让其他计算机与您的计算机通信,则可以尝试使用“传入连接”疑难解答自动查找并修复一些常见问题。

通过单击“开始”按钮 「开始」按钮的图片,然后单击“控制面板”,打开“传入的连接”疑难解答。在搜索框中,键入疑难解答,然后单击“疑难解答”。单击“查看全部”,然后单击“传入的连接”

服务器开发惯例和对现有开源项目的一些看法

上一篇转发的文章,作者提到了他的另一篇关于连接服务器的文章, 塞外浪子做了评价,我觉得还是比较中肯:

 

先保证通讯模块的稳定性和性能,然后再往下构架。。。毕竟是一劳永逸的事情。
关于服务器开发,必须要做到网络与逻辑分开。逻辑处理线程池可以根据各自需求整合到网络引擎中,或者整合到逻辑中。
我一般写服务器都是跨平台的,window下面用iocp,linux下用epoll,或者poll+多线程。
   线程池部分直接整合到网络引擎中。
   逻辑部分采用插件模式,即插即用,关于逻辑中的共享数据我将单独保存。

以上模型我就写了一次,用了3年,当然随着网络技术不断发展我会不断支持各自新的网络技术。
关于ACE或者各种开源引擎,我觉得好比是一件漂亮的衣服,但是要用的话很难控制甚至达不到开发者的初衷,发生问题的时候你只能等待新版本的更新了,这方面我以前可是吃尽了苦头。

小弟愚见!祝愿风云游戏作品越来越好,也希望有一天我写的游戏能够尽快稳定。

 

 

ZeroMQ 的模式

在需要并行化处理数据的时候,采用消息队列通讯的方式来协作,比采用共享状态的方式要好的多。Erlang ,Go 都使用这一手段来让并行任务之间协同工作。

最近读完了 ZeroMQGuide。写的很不错。前几年一直有做类似的工作,但是自己总结的不好。而 ZeroMQ 把消息通讯方面的模式总结的很不错。

ZeroMQ 并不是一个对 socket 的封装,不能用它去实现已有的网络协议。它有自己的模式,不同于更底层的点对点通讯模式。它有比 tcp 协议更高一级的协议。(当然 ZeroMQ 不一定基于 TCP 协议,它也可以用于进程间和进程内通讯。)它改变了通讯都基于一对一的连接这个假设。

ZeroMQ 把通讯的需求看成四类。其中一类是一对一结对通讯,用来支持传统的 TCP socket 模型,但并不推荐使用。常用的通讯模式只有三类。

  1. 请求回应模型。由请求端发起请求,并等待回应端回应请求。从请求端来看,一定是一对对收发配对的;反之,在回应端一定是发收对。请求端和回应端都可以是 1:N 的模型。通常把 1 认为是 server ,N 认为是 Client 。ZeroMQ 可以很好的支持路由功能(实现路由功能的组件叫作 Device),把 1:N 扩展为 N:M (只需要加入若干路由节点)。从这个模型看,更底层的端点地址是对上层隐藏的。每个请求都隐含有回应地址,而应用则不关心它。
  2. 发布订阅模型。这个模型里,发布端是单向只发送数据的,且不关心是否把全部的信息都发送给订阅端。如果发布端开始发布信息的时候,订阅端尚未连接上来,这些信息直接丢弃。不过一旦订阅端连接上来,中间会保证没有信息丢失。同样,订阅端则只负责接收,而不能反馈。如果发布端和订阅端需要交互(比如要确认订阅者是否已经连接上),则使用额外的 socket 采用请求回应模型满足这个需求。
  3. 管道模型。这个模型里,管道是单向的,从 PUSH 端单向的向 PULL 端单向的推送数据流。

任何分布式,并行的需求,都可以用这三种模型组合起来解决问题。ZeroMQ 只专注和解决了消息通讯这一基本问题,干的非常漂亮。

基于定义好的模型,我们可以看到,api 可以实现的非常简单易用。我们不再需要 bind/listen/accept 来架设服务器,因为这个模型天然是 1:N 而不是 1:1 的,不需要为每个通道保留一个句柄。我们也不必在意 server 是否先启动(bind),而后才能让 client 工作起来(connect)。

这以上模型中,关注的是通讯双方的职责,而不是实现的方式:监听端口还是连接对方端口。对于复杂的多进程协同工作的系统, 不必纠结于进程启动的次序(在我前几年设计的系统中,启动脚本写起来就非常麻烦)。

使用 ZeroMQ 不必在意底层实现是使用短连接还是长连接方式。ZeroMQ 中的 Transient (短暂) 和 Durable (持久) socket 也并非区别于实现层是否保持了 tcp 连接。而是概念上的不同。对于 Durable socket ,其生命期可以长于一个进程的生命期,即使进程退出,再次启动后依旧可以维持继续之前的 socket 。当然,这并不是帮助你挽救你的程序因出错而崩溃的。它只是提出这个模式,让你根据设计需要可以实现。对于 ZeroMQ ,如有需求(若内存有限),甚至把数据传输的 buffer 放到磁盘上。


对于网络游戏,我觉得基于 ZeroMQ 来架构服务器非常合适。对于玩家 Client – Server 部分倒不必使用 ZeroMQ ,而可以写一个前端程序,比如前些年写过的一篇 blog 中提到的连接服务器依然适用。这个连接服务器对内的服务集群则可以用 ZeroMQ 的协议通讯。

本文来自:云风的 BLOG

Epoll介绍和程序实例

1. Epoll 是何方神圣?

Epoll 可是当前在 Linux 下开发大规模并发网络程序的热门人选, Epoll在 Linux2.6内核中正式引入,和 select
相似,其实都 I/O多路复用技术而已 ,并没有什么神秘的。

其实在 Linux 下设计并发网络程序,向来不缺少方法,比如典型的 Apache 模型( Process Per Connection ,简称 PPC ), TPC ( Thread Per Connection)模型,以及 select 模型和 poll 模型,那为何还要再引入 Epoll 这个东东呢?那还是有得说说的 …

2. 常用模型的缺点

如果不摆出来其他模型的缺点,怎么能对比出Epoll 的优点呢。

2.1 PPC/TPC 模型

这两种模型思想类似,就是让每一个到来的连接一边自己做事去,别再来烦我
。只是 PPC 是为它开了一个进程,而 TPC 开了一个线程。可是别烦我是有代价的,它要时间和空间啊,连接多了之后,那么多的进程 / 线程切换,这开销就上来了;因此这类模型能接受的最大连接数都不会高,一般在几百个左右。

2.2 select 模型

1. 最大并发数限制,因为一个进程所打开的 FD
(文件描述符)是有限制的,由 FD_SETSIZE 设置,默认值是 1024/2048 ,因此 Select 模型的最大并发数就被相应限制了。自己改改这个 FD_SETSIZE ?想法虽好,可是先看看下面吧…

2. 效率问题, select 每次调用都会线性扫描全部的 FD 集合,这样效率就会呈现线性下降,把 FD_SETSIZE 改大的后果就是,大家都慢慢来,什么?都超时了??!!

3. 内核 / 用户空间 内存拷贝问题,如何让内核把 FD 消息通知给用户空间呢?在这个问题上 select采取了内存拷贝方法。

2.3 poll 模型

基本上效率和 select 是相同的, select 缺点的 2 和 3 它都没有改掉。

3. Epoll 的提升

把其他模型逐个批判了一下,再来看看 Epoll 的改进之处吧,其实把 select 的缺点反过来那就是 Epoll 的优点了。

3.1. Epoll 没有最大并发连接的限制,上限是最大可以打开文件的数目,这个数字一般远大于 2048, 一般来说这个数目和系统内存关系很大 ,具体数目可以 cat /proc/sys/fs/file-max 察看。

3.2. 效率提升, Epoll 最大的优点就在于它只管你“活跃”的连接
,而跟连接总数无关,因此在实际的网络环境中, Epoll 的效率就会远远高于 select 和 poll 。

3.3. 内存拷贝, Epoll 在这点上使用了“共享内存 ”,这个内存拷贝也省略了。

4. Epoll 为什么高效
Epoll 的高效和其数据结构的设计是密不可分的,这个下面就会提到
首先回忆一下 select 模型,当有 I/O 事件到来时, select 通知应用程序有事件到了快去处理,而应用程序必须轮询所有的 FD 集合,测试每个 FD 是否有事件发生,并处理事件;代码像下面这样:

[code lang=”c”]
int res = select(maxfd+1, &readfds, NULL, NULL, 120);
if (res > 0)
{
for (int i = 0; i < MAX_CONNECTION; i++)
{
if (FD_ISSET(allConnection[i], &readfds))
{
handleEvent(allConnection[i]);
}
}
}
// if(res == 0) handle timeout, res < 0 handle error
[/code]

Epoll 不仅会告诉应用程序有I/0事件到来,还会告诉应用程序相关的信息,这些信息是应用程序填充的,因此根据这些信息应用程序就能直接定位到事件,而不必遍历整个FD 集合。

[code lang=”c”]
int res = epoll_wait(epfd, events, 20, 120);
for (int i = 0; i < res;i++)
{
    handleEvent(events[n]);
}
[/code]

5. Epoll 关键数据结构

前面提到 Epoll 速度快和其数据结构密不可分,其关键数据结构就是:

[code lang=”c”]
struct epoll_event {
__uint32_t events; // Epoll events
epoll_data_t data; // User data variable
};
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

[/code]

可见 epoll_data 是一个 union 结构体 , 借助于它应用程序可以保存很多类型的信息
:fd 、指针等等。有了它,应用程序就可以直接定位目标了。

6. 使用 Epoll

既然 Epoll 相比 select 这么好,那么用起来如何呢?会不会很繁琐啊 …
先看看下面的三个函数吧,就知道 Epoll 的易用了。

[code lang=”c”]
int epoll_create(int size);
[/code]

生成一个 Epoll 专用的文件描述符,其实是申请一个内核空间,用来存放你想关注的 socket fd 上是否发生以及发生了什么事件。
size 就是你在这个 Epoll fd 上能关注的最大 socket fd 数,大小自定,只要内存足够。

[code lang=”c”]
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event );
[/code]

控制某个 Epoll 文件描述符上的事件:注册、修改、删除。其中参数 epfd 是 epoll_create() 创建 Epoll 专用的文件描述符。相对于 select 模型中的 FD_SET 和 FD_CLR 宏。

[code lang=”c”]
int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout);
[/code]

等待 I/O 事件的发生;参数说明:

epfd: 由 epoll_create()生成的 Epoll 专用的文件描述符;

epoll_event: 用于回传代处理事件的数组;

maxevents: 每次能处理的事件数;

timeout: 等待 I/O 事件发生的超时值;

返回发生事件数。

相对于 select 模型中的 select 函数。

7. 例子程序

下面是一个简单 Echo Server 的例子程序,麻雀虽小,五脏俱全,还包含了一个简单的超时检查机制,简洁起见没有做错误处理。

[code lang=”c”]

//
// a simple echo server using epoll in linux
//
// 2009-11-05
// by sparkling
//
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <iostream>
using namespace std;
#define MAX_EVENTS 500
struct myevent_s
{
int fd;
void (*call_back)(int fd, int events, void *arg);
int events;
void *arg;
int status; // 1: in epoll wait list, 0 not in
char buff[128]; // recv data buffer
int len;
long last_active; // last active time
};
// set event
void EventSet(myevent_s *ev, int fd, void (*call_back)(int, int, void*), void *arg)
{
ev->fd = fd;
ev->call_back = call_back;
ev->events = 0;
ev->arg = arg;
ev->status = 0;
ev->last_active = time(NULL);
}
// add/mod an event to epoll
void EventAdd(int epollFd, int events, myevent_s *ev)
{
struct epoll_event epv = {0, {0}};
int op;
epv.data.ptr = ev;
epv.events = ev->events = events;
if(ev->status == 1){
op = EPOLL_CTL_MOD;
}
else{
op = EPOLL_CTL_ADD;
ev->status = 1;
}
if(epoll_ctl(epollFd, op, ev->fd, &epv) < 0)
printf("Event Add failed[fd=%d]/n", ev->fd);
else
printf("Event Add OK[fd=%d]/n", ev->fd);
}
// delete an event from epoll
void EventDel(int epollFd, myevent_s *ev)
{
struct epoll_event epv = {0, {0}};
if(ev->status != 1) return;
epv.data.ptr = ev;
ev->status = 0;
epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv);
}
int g_epollFd;
myevent_s g_Events[MAX_EVENTS+1]; // g_Events[MAX_EVENTS] is used by listen fd
void RecvData(int fd, int events, void *arg);
void SendData(int fd, int events, void *arg);
// accept new connections from clients
void AcceptConn(int fd, int events, void *arg)
{
struct sockaddr_in sin;
socklen_t len = sizeof(struct sockaddr_in);
int nfd, i;
// accept
if((nfd = accept(fd, (struct sockaddr*)&sin, &len)) == -1)
{
if(errno != EAGAIN && errno != EINTR)
{
printf("%s: bad accept", __func__);
}
return;
}
do
{
for(i = 0; i < MAX_EVENTS; i++)
{
if(g_Events[i].status == 0)
{
break;
}
}
if(i == MAX_EVENTS)
{
printf("%s:max connection limit[%d].", __func__, MAX_EVENTS);
break;
}
// set nonblocking
if(fcntl(nfd, F_SETFL, O_NONBLOCK) < 0) break;
// add a read event for receive data
EventSet(&g_Events[i], nfd, RecvData, &g_Events[i]);
EventAdd(g_epollFd, EPOLLIN|EPOLLET, &g_Events[i]);
printf("new conn[%s:%d][time:%d]/n", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port), g_Events[i].last_active);
}while(0);
}
// receive data
void RecvData(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s*)arg;
int len;
// receive data
len = recv(fd, ev->buff, sizeof(ev->buff)-1, 0);
EventDel(g_epollFd, ev);
if(len > 0)
{
ev->len = len;
ev->buff[len] = ‘/0’;
printf("C[%d]:%s/n", fd, ev->buff);
// change to send event
EventSet(ev, fd, SendData, ev);
EventAdd(g_epollFd, EPOLLOUT|EPOLLET, ev);
}
else if(len == 0)
{
close(ev->fd);
printf("[fd=%d] closed gracefully./n", fd);
}
else
{
close(ev->fd);
printf("recv[fd=%d] error[%d]:%s/n", fd, errno, strerror(errno));
}
}
// send data
void SendData(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s*)arg;
int len;
// send data
len = send(fd, ev->buff, ev->len, 0);
ev->len = 0;
EventDel(g_epollFd, ev);
if(len > 0)
{
// change to receive event
EventSet(ev, fd, RecvData, ev);
EventAdd(g_epollFd, EPOLLIN|EPOLLET, ev);
}
else
{
close(ev->fd);
printf("recv[fd=%d] error[%d]/n", fd, errno);
}
}
void InitListenSocket(int epollFd, short port)
{
int listenFd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(listenFd, F_SETFL, O_NONBLOCK); // set non-blocking
printf("server listen fd=%d/n", listenFd);
EventSet(&g_Events[MAX_EVENTS], listenFd, AcceptConn, &g_Events[MAX_EVENTS]);
// add listen socket
EventAdd(epollFd, EPOLLIN|EPOLLET, &g_Events[MAX_EVENTS]);
// bind & listen
sockaddr_in sin;
bzero(&sin, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = htons(port);
bind(listenFd, (const sockaddr*)&sin, sizeof(sin));
listen(listenFd, 5);
}
int main(int argc, char **argv)
{
short port = 12345; // default port
if(argc == 2){
port = atoi(argv[1]);
}
// create epoll
g_epollFd = epoll_create(MAX_EVENTS);
if(g_epollFd <= 0) printf("create epoll failed.%d/n", g_epollFd);
// create & bind listen socket, and add to epoll, set non-blocking
InitListenSocket(g_epollFd, port);
// event loop
struct epoll_event events[MAX_EVENTS];
printf("server running:port[%d]/n", port);
int checkPos = 0;
while(1){
// a simple timeout check here, every time 100, better to use a mini-heap, and add timer event
long now = time(NULL);
for(int i = 0; i < 100; i++, checkPos++) // doesn’t check listen fd
{
if(checkPos == MAX_EVENTS) checkPos = 0; // recycle
if(g_Events[checkPos].status != 1) continue;
long duration = now – g_Events[checkPos].last_active;
if(duration >= 60) // 60s timeout
{
close(g_Events[checkPos].fd);
printf("[fd=%d] timeout[%d–%d]./n", g_Events[checkPos].fd, g_Events[checkPos].last_active, now);
EventDel(g_epollFd, &g_Events[checkPos]);
}
}
// wait for events to happen
int fds = epoll_wait(g_epollFd, events, MAX_EVENTS, 1000);
if(fds < 0){
printf("epoll_wait error, exit/n");
break;
}
for(int i = 0; i < fds; i++){
myevent_s *ev = (struct myevent_s*)events[i].data.ptr;
if((events[i].events&EPOLLIN)&&(ev->events&EPOLLIN)) // read event
{
ev->call_back(ev->fd, events[i].events, ev->arg);
}
if((events[i].events&EPOLLOUT)&&(ev->events&EPOLLOUT)) // write event
{
ev->call_back(ev->fd, events[i].events, ev->arg);
}
}
}
// free resource
return 0;
}

[/code]

本文来自:http://blog.csdn.net/sparkliang/article/details/4770655

libevent ,一个强大的通讯框架库

libevent是个Linux下的开源项目,最初发起原因是,解决socket通讯中数万级别的大量连接请求的处理,实现高效高并发服务而设计。他是个面向消息的框架,使用者不用面对socket的底层函数,利用libevent的消息驱动框架,就能使其开发的服务轻松实现高效高连接高并发能力。

The libevent provides a mechanism to execute a callback function when a specific event occurs on a file descriptor or after a timeout has been reached. Furthermore, libevent also support callbacks due to signals or regular timeouts.

libevent is meant to replace the event loop found in event driven network servers. An application just needs to call event_dispatch() and then add or remove events dynamically without having to change the event loop.

目前已经有很多应用基于此框架开发,看看下面的列表你就会知道它到底有多厉害了:

Programs using libevent

The usefulness of libevent API is demonstrated by the following applications:

  • Vomit – Voice Over Misconfigured Internet Telephones
  • Crawl – A Small and Efficient HTTP Crawler
  • Libio – an input/output abstraction library
  • Honeyd – a virtual honeynet daemon – can be used to fight Internet worms.
  • Fragroute – an IDS testing tool
  • Nylon – nested proxy server
  • Disconcert – a Distributed Computing Framework for Loosely-Coupled Workstations.
  • PLB – pure load balancer: a free high-performance load balancer for Unix.
  • Trickle – a lightweight userspace bandwidth shaper.
  • Memcached – a high-performance, distributed memory object caching system.
  • watchcatd – software watchdog designed to take actions not as drastic as the usual solutions, which reset the machine.
  • ScanSSH – a fast SSH server and open proxy scanner.
  • Nttlscan – a network topology scanner for Honeyd.
  • NetChat – a combination of netcat and ppp’s chat.
  • Io – a small programming language; uses libevent for network communication.
  • Tor – an anonymous Internet communication system.
  • Systrace – a system call sandbox.
  • SpyBye – detect malware on web pages.
  • GreenSQL – an SQL database firewall.
  • dnsscan – a fast scanner for identifying open recursive dns resolvers
  • Kargo Event – a PHP extension for libevent.

Linux下如何清空socket的接收缓冲区的数据

最近碰到一个问题,对于阻塞模式的socket通讯,如果要实现设备的命令控制,那么进入命令流前,缓冲区不能存有上次通讯没有取回的信息,否则一旦命令发出,然后读取缓冲区,很显然会读到上一次的剩余数据。做法当然很简单,就是先清除接收区的缓冲数据,可是如何清除?

网上有很多这样的问题,但都没什么规范的解决办法,有的甚至为了达到清空的目的,建议先close一下socket,这个太大手笔了,为了解决一个小问题而大动干戈,根本不是个合理的解决办法。

还有就是用recv读取,但是由于不知道缓存里有多少数据,如果是阻塞模式,到最后必然等到超时才知道数据已经读取完毕,这是个问题。

另一个是用fgetc,通过返回判断是否是feof:

[code lang=”C”]
whlie (1) {
a=fgetc(f);
if (feof(f)) break;
//…
b=fgetc(f);
if (feof(f)) break;
//…
}[/code]

当然,我不知道读取完毕后最后一次调用fgetc会不会堵塞,需要测试。

在非阻塞模式下,我们用recv就可以轻松搞定了,但是阻塞模式下,由于我们不知道缓冲区有多少数据,不能直接调用recv尝试清除。

使用一个小小的技巧,利用select函数,我们可以轻松搞定这个问题:

select函数用于监视一个文件描述符集合,如果集合中的描述符没有变化,则一直阻塞在这里,直到超时时间到达;在超时时间内,一旦某个描述符触发了你所关心的事件,select立即返回,通过检索文件描述符集合处理相应事件;select函数出错则返回小于零的值,如果有事件触发,则返回触发事件的描述符个数;如果超时,返回0,即没有数据可读。

重点在于:我们可以用select的超时特性,将超时时间设置为0,通过检测select的返回值,就可以判断缓冲是否被清空。通过这个技巧,使一个阻塞的socket成了‘非阻塞’socket。

现在就可以得出解决方案了:使用select函数来监视要清空的socket描述符,并把超时时间设置为0,每次读取一个字节然后丢弃(或者按照业务需要进行处理,随你便了),一旦select返回0,说明缓冲区没数据了(“超时”了)。

[code lang=”C”]
struct timeval tmOut;
tmOut.tv_sec = 0;
tmOut.tv_usec = 0;
fd_set fds;
FD_ZEROS(&amp;fds);
FD_SET(skt, &amp;fds);

int nRet;
char tmp[2];
memset(tmp, 0, sizeof(tmp));

while(1)
{
nRet= select(FD_SETSIZE, &amp;fds, NULL, NULL, &amp;tmOut);
if(nRet== 0) break;
recv(skt, tmp, 1,0);
}
[/code]

 这种方式的好处是,不再需要用recv、recvfrom等阻塞函数直接去读取,而是使用select,利用其超时特性检测缓冲区是否为空来判断是否有数据,有数据时才调用recv进行清除。

有人说同样可以用recv和socket的超时设置去清空啊,这个没错,但是你需要直接对socket描述符设置超时时间,而为了清空数据而直接修改socket描述符的属性,可能会影响到其他地方的使用,造成系统奇奇怪怪的问题,所以,不推荐使用。



提高linux的连接限制

一般linux下TCP连接的限制在TD_SETSIZE,系统默认为1024,由FD_SETSIZE决定。

1.修改方法:
  修改/usr/etc/security/limits.conf文件,加入
  * soft nofile 20000
  * hard nofile 20000
  然后reboot系统。
服务器就可以建立连接到20000个了,其连接方法是直接用connect,accept,注意这里用select是不可以的。

2.上面的方法在不用select方法的情况下是不可以的,如果你用select,那么仍然只能打开1024个,这是因为select的数目由FD_SETSIZE决定的。那么我们可以改用poll来替代select,poll数组大小可以根据我们自己的需要来定义,这样就解决了这个问题。

3.linux中的是通过文件方式来管理系统的,因此系统能承载多少TCP连接和系统文件打开数目能力是相关的。
另外在/proc/sys/file-max中定义了,系统最多能够打开的文件数目。

附代码:
 服务器端:pollserver.cpp
#include <iostream>
#include <ctype.h>
#include <errno.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/file.h>
#include <sys/ioctl.h>
#include <sys/wait.h>
#include <sys/types.h>
//#include <asm/poll.h>
#include <netdb.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string>
#include <sys/poll.h>
#include <limits.h> /* for OPEN_MAX */

#define LPORT 3333
#define LISTENQ 1024
#define OPEN_MAX 50000
#define MAX_LINE 100
void setnonblocking(int sock)
{
 int opts;
 opts=fcntl(sock,F_GETFL);
 if(opts<0)
 {
  perror(“fcntl(sock,GETFL)”);
  exit(1);
 }

 opts = opts|O_NONBLOCK;
 if(fcntl(sock,F_SETFL,opts)<0)
 {
  perror(“fcntl(sock,SETFL,opts)”);
  exit(1);
 }   
}
int main(int argc, char **argv)
{
 //int i, maxi, listenfd, connfd, sockfd;
 //int nready; ssize_t n;

 int listenfd;
 struct sockaddr_in cliaddr;  //客户端的sock描述
 struct sockaddr_in servaddr; //服务器的sock描述

 struct pollfd *array_conn;
 array_conn =  new pollfd[OPEN_MAX];

 if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
 {
  perror(“socket create error!”);
  exit(1);
 }
 bzero(&servaddr, sizeof(servaddr));
 servaddr.sin_family = AF_INET;
 servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
 servaddr.sin_port = htons(LPORT);

 /* 置 socket 重新使用  */
 int opt = 1;
 if (setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,(char *) &opt,sizeof(opt))<0)
 {
  close(listenfd);
  perror(“!cserver::init_comm() setsockopt error!”);
  exit(1);
 }

 if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr))== -1)
 {
  perror(“bind error!”);
  exit(1);
 }
 listen(listenfd, LISTENQ);

 array_conn[0].fd = listenfd;
 array_conn[0].events = POLLRDNORM;
 int i;
 for (  i = 1; i < OPEN_MAX; i++)  array_conn[i].fd = -1; /* -1 indicates available entry */
 int maxi = 0;

 int connfd;
 int nready;
 int clilen;
 size_t n;
 int x; x=0;
 for ( ; ; ) {
  int nready = poll(array_conn, maxi+1, 0);

  if (array_conn[0].revents & POLLRDNORM)
  {
   clilen = sizeof(cliaddr);
   connfd = accept(listenfd, (struct sockaddr *) &cliaddr,(socklen_t*) &clilen);
            printf(” ||%d: Connection from %s:%d\n”,x++, inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
   // setnonblocking(connfd);
   for (i = 1; i < OPEN_MAX; i++)
    if (array_conn[i].fd < 0)
    {
     array_conn[i].fd = connfd; // save descriptor
     break;
    }
    if (i == OPEN_MAX)
    {
     printf(“too many clients”);
     break;
    }
    array_conn[i].events = POLLRDNORM;
    if (i > maxi) maxi = i; 
    if (–nready <= 0) continue; /* no more readable descriptors */
  }

  /* check all clients for data */
char line[100];
  int sockfd;
  for (i = 1; i <= maxi; i++)
  { /* check all clients for data */
   if ( (sockfd = array_conn[i].fd) < 0) continue;
   if (array_conn[i].revents & (POLLRDNORM | POLLERR))
   {
    if ( (n = read(sockfd, line, MAX_LINE)) < 0)
    {
     if (errno == ECONNRESET)
     {
      /*4connection reset by client */
      close(sockfd);
      array_conn[i].fd = -1;
     } else
     {
      printf(“readline error\n”);
     }
    }
    else if (n == 0)
    {
     /*4connection closed by client */
     close(sockfd);
     array_conn[i].fd = -1;
    } else
    {
     char sb[10];
     printf(“Recv:%s from socket %d”,line,sockfd);
     sprintf(sb,”Pong!”);
     write(sockfd, sb, 10);
    }
    if (–nready <= 0)
     break; /* no more readable descriptors */
   }
  }

 

 }
}
客户端pollclient.cpp
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>

int main()
{
 int sockfd;
 int address_len;
 int connect_flag;
 struct sockaddr_in address;
 int connect_result;
 char client_ch,server_ch;

 int *sock;
 sock= new int [50000];
 int index=0;
 int n=0;
 int ssock;
 address.sin_family=AF_INET;
 address.sin_addr.s_addr=inet_addr(“192.168.1.249”);
 address.sin_port=htons(3333);
 address_len=sizeof(address);
 while(1)
 {
  ssock=socket(AF_INET,SOCK_STREAM,0);
  if(ssock<0)
  {
   printf(“local sockfd error \n”);
   break;
  }

  connect_flag=connect(ssock,(struct sockaddr *)&address,address_len);
  if(connect_flag==-1)
  {
   perror(“client”);
   break;
  }
  printf(“%d Connected! “,n++);
  char cch[10];
  sprintf(cch,”Ping*”);
  write(ssock,cch,10);
  char rbuf[100];
  read(ssock,rbuf,100);
  printf( “%d Receive from server ;%s\n”,n,rbuf);
  //char c_ch;
  //c_ch=’*’;
  //write(ssock,&c_ch,1);
  //char rbuf[100];
  //read(ssock,rbuf,100);
  //printf( “%d Receive from server ;%s\n”,n,rbuf);

 }
 delete [] sock;
 return 0;
}

Linux下突破限制实现高并发量服务器

 
1、修改用户进程可打开文件数限制

在Linux平台上,无论编写客户端程序还是服务端程序,在进行高并发TCP连接处理时,最高的并发数量都要受到系统对用户单一进程同时可打开文件数量的 限制(这是因为系统为每个TCP连接都要创建一个socket句柄,每个socket句柄同时也是一个文件句柄)。可使用ulimit命令查看系统允许当 前用户进程打开的文件数限制:
[speng@as4 ~]$ ulimit -n
1024
这表示当前用户的每个进程最多允许同时打开1024个文件,这1024个文件中还得除去每个进程必然打开的标准输入,标准输出,标准错误,服务器监听 socket,进程间通讯的unix域socket等文件,那么剩下的可用于客户端socket连接的文件数就只有大概1024-10=1014个左右。 也就是说缺省情况下,基于Linux的通讯程序最多允许同时1014个TCP并发连接。

对于想支持更高数量的TCP并发连接的通讯处理程序,就必须修改Linux对当前用户的进程同时打开的文件数量的软限制(soft limit)和硬限制(hardlimit)。其中软限制是指Linux在当前系统能够承受的范围内进一步限制用户同时打开的文件数;硬限制则是根据系统 硬件资源状况(主要是系统内存)计算出来的系统最多可同时打开的文件数量。通常软限制小于或等于硬限制。

修改上述限制的最简单的办法就是使用ulimit命令:
[speng@as4 ~]$ ulimit -n <file_num>
上述命令中,在<file_num>中指定要设置的单一进程允许打开的最大文件数。如果系统回显类似于“Operation notpermitted”之类的话,说明上述限制修改失败,实际上是因为在<file_num>中指定的数值超过了Linux系统对该用户 打开文件数的软限制或硬限制。因此,就需要修改Linux系统对用户的关于打开文件数的软限制和硬限制。

第一步,修改/etc/security/limits.conf文件,在文件中添加如下行:
speng soft nofile 10240
speng hard nofile 10240
其中speng指定了要修改哪个用户的打开文件数限制,可用’*’号表示修改所有用户的限制;soft或hard指定要修改软限制还是硬限制;10240 则指定了想要修改的新的限制值,即最大打开文件数(请注意软限制值要小于或等于硬限制)。修改完后保存文件。

第二步,修改/etc/pam.d/login文件,在文件中添加如下行:
session required /lib/security/pam_limits.so
这是告诉Linux在用户完成系统登录后,应该调用pam_limits.so模块来设置系统对该用户可使用的各种资源数量的最大限制(包括用户可打开的 最大文件数限制),而pam_limits.so模块就会从/etc/security/limits.conf文件中读取配置来设置这些限制值。修改完 后保存此文件。

第三步,查看Linux系统级的最大打开文件数限制,使用如下命令:
[speng@as4 ~]$ cat /proc/sys/fs/file-max
12158
这表明这台Linux系统最多允许同时打开(即包含所有用户打开文件数总和)12158个文件,是Linux系统级硬限制,所有用户级的打开文件数限制都 不应超过这个数值。通常这个系统级硬限制是Linux系统在启动时根据系统硬件资源状况计算出来的最佳的最大同时打开文件数限制,如果没有特殊需要,不应 该修改此限制,除非想为用户级打开文件数限制设置超过此限制的值。修改此硬限制的方法是修改/etc/rc.local脚本,在脚本中添加如下行:
echo 22158 > /proc/sys/fs/file-max
这是让Linux在启动完成后强行将系统级打开文件数硬限制设置为22158。修改完后保存此文件。

完成上述步骤后重启系统,一般情况下就可以将Linux系统对指定用户的单一进程允许同时打开的最大文件数限制设为指定的数值。如果重启后用 ulimit-n命令查看用户可打开文件数限制仍然低于上述步骤中设置的最大值,这可能是因为在用户登录脚本/etc/profile中使用ulimit -n命令已经将用户可同时打开的文件数做了限制。由于通过ulimit-n修改系统对用户可同时打开文件的最大数限制时,新修改的值只能小于或等于上次 ulimit-n设置的值,因此想用此命令增大这个限制值是不可能的。所以,如果有上述问题存在,就只能去打开/etc/profile脚本文件,在文件 中查找是否使用了ulimit-n限制了用户可同时打开的最大文件数量,如果找到,则删除这行命令,或者将其设置的值改为合适的值,然后保存文件,用户退 出并重新登录系统即可。
通过上述步骤,就为支持高并发TCP连接处理的通讯处理程序解除关于打开文件数量方面的系统限制。

2、 修改网络内核对TCP连接的有关限制

在Linux上编写支持高并发TCP连接的客户端通讯处理程序时,有时会发现尽管已经解除了系统对用户同时打开文件数的限制,但仍会出现并发TCP连接数 增加到一定数量时,再也无法成功建立新的TCP连接的现象。出现这种现在的原因有多种。

第一种原因可能是因为Linux网络内核对本地端口号范围有限制。此时,进一步分析为什么无法建立TCP连接,会发现问题出在connect()调用返回 失败,查看系统错误提示消息是“Can’t assign requestedaddress”。同时,如果在此时用tcpdump工具监视网络,会发现根本没有TCP连接时客户端发SYN包的网络流量。这些情况 说明问题在于本地Linux系统内核中有限制。其实,问题的根本原因在于Linux内核的TCP/IP协议实现模块对系统中所有的客户端TCP连接对应的 本地端口号的范围进行了限制(例如,内核限制本地端口号的范围为1024~32768之间)。当系统中某一时刻同时存在太多的TCP客户端连接时,由于每 个TCP客户端连接都要占用一个唯一的本地端口号(此端口号在系统的本地端口号范围限制中),如果现有的TCP客户端连接已将所有的本地端口号占满,则此 时就无法为新的TCP客户端连接分配一个本地端口号了,因此系统会在这种情况下在connect()调用中返回失败,并将错误提示消息设为“Can’t assignrequested address”。有关这些控制逻辑可以查看Linux内核源代码,以linux2.6内核为例,可以查看tcp_ipv4.c文件中如下函数:
static int tcp_v4_hash_connect(struct sock *sk)
请注意上述函数中对变量sysctl_local_port_range的访问控制。变量sysctl_local_port_range的初始化则是在 tcp.c文件中的如下函数中设置:
void __init tcp_init(void)
内核编译时默认设置的本地端口号范围可能太小,因此需要修改此本地端口范围限制。
第一步,修改/etc/sysctl.conf文件,在文件中添加如下行:
net.ipv4.ip_local_port_range = 1024 65000
这表明将系统对本地端口范围限制设置为1024~65000之间。请注意,本地端口范围的最小值必须大于或等于1024;而端口范围的最大值则应小于或等 于65535。修改完后保存此文件。
第二步,执行sysctl命令:
[speng@as4 ~]$ sysctl -p
如果系统没有错误提示,就表明新的本地端口范围设置成功。如果按上述端口范围进行设置,则理论上单独一个进程最多可以同时建立60000多个TCP客户端 连接。

第二种无法建立TCP连接的原因可能是因为Linux网络内核的IP_TABLE防火墙对最大跟踪的TCP连接数有限制。此时程序会表现为在 connect()调用中阻塞,如同死机,如果用tcpdump工具监视网络,也会发现根本没有TCP连接时客户端发SYN包的网络流量。由于 IP_TABLE防火墙在内核中会对每个TCP连接的状态进行跟踪,跟踪信息将会放在位于内核内存中的conntrackdatabase中,这个数据库 的大小有限,当系统中存在过多的TCP连接时,数据库容量不足,IP_TABLE无法为新的TCP连接建立跟踪信息,于是表现为在connect()调用 中阻塞。此时就必须修改内核对最大跟踪的TCP连接数的限制,方法同修改内核对本地端口号范围的限制是类似的:
第一步,修改/etc/sysctl.conf文件,在文件中添加如下行:
net.ipv4.ip_conntrack_max = 10240
这表明将系统对最大跟踪的TCP连接数限制设置为10240。请注意,此限制值要尽量小,以节省对内核内存的占用。
第二步,执行sysctl命令:
[speng@as4 ~]$ sysctl -p
如果系统没有错误提示,就表明系统对新的最大跟踪的TCP连接数限制修改成功。如果按上述参数进行设置,则理论上单独一个进程最多可以同时建立10000 多个TCP客户端连接。

3、使用支持高并发网络I/O的编程技术

在Linux上编写高并发TCP连接应用程序时,必须使用合适的网络I/O技术和I/O事件分派机制。

可用的I/O技术有同步I/O,非阻塞式同步I/O(也称反应式I/O),以及异步I/O。在高TCP并发的情形下,如果使用同步I/O,这会严重阻塞程 序的运转,除非为每个TCP连接的I/O创建一个线程。但是,过多的线程又会因系统对线程的调度造成巨大开销。因此,在高TCP并发的情形下使用同步 I/O是不可取的,这时可以考虑使用非阻塞式同步I/O或异步I/O。非阻塞式同步I/O的技术包括使用select(),poll(),epoll等机 制。异步I/O的技术就是使用AIO。

从I/O事件分派机制来看,使用select()是不合适的,因为它所支持的并发连接数有限(通常在1024个以内)。如果考虑性能,poll()也是不 合适的,尽管它可以支持的较高的TCP并发数,但是由于其采用“轮询”机制,当并发数较高时,其运行效率相当低,并可能存在I/O事件分派不均,导致部分 TCP连接上的I/O出现“饥饿”现象。而如果使用epoll或AIO,则没有上述问题(早期Linux内核的AIO技术实现是通过在内核中为每个 I/O请求创建一个线程来实现的,这种实现机制在高并发TCP连接的情形下使用其实也有严重的性能问题。但在最新的Linux内核中,AIO的实现已经得 到改进)。

综上所述,在开发支持高并发TCP连接的Linux应用程序时,应尽量使用epoll或AIO技术来实现并发的TCP连接上的I/O控制,这将为提升程序 对高并发TCP连接的支持提供有效的I/O保证。

Date: 2007-01-31
OS: Red Hat Enterprise Linux AS release 4 (kernel version 2.6.9-5.EL)

五种I/O 模式
—————————————-
在Linux/UNIX 下,有下面这五种I/O 操作方式:
阻塞I/O
非阻塞I/O
I/O 多路复用
信号驱动I/O(SIGIO)
异步I/O

程序进行输入操作有两步:
等待有数据可以读
将数据从系统内核中拷贝到程序的数据区。

对于一个对套接字的输入操作:
第一步一般来说是,等待数据从网络上传到本地,当数据包到达的时候,数据将会从网络层拷贝到内核的缓存中;
第二步是从内核中把数据拷贝到程序的数据区中

.阻塞I/O 模式
简单的说,阻塞就是"睡眠"的同义词
如你运行上面的listener 的时候,它只不过是简单的在那里等待接收数据。它调用recvfrom()函数,但是那个时候(listener 调用recvfrom()函数的时候),它并没有数据可以接收.所以recvfrom()函数阻塞在那里(也就是程序停在recvfrom()函数处睡大 觉)直到有数据传过来阻塞.你应该明白它的意思。

阻塞I/O 模式是最普遍使用的I/O 模式。大部分程序使用的都是阻塞模式的I/O 。
缺省的,一个套接字建立后所处于的模式就是阻塞I/O 模式。

对于一个UDP 套接字来说,数据就绪的标志比较简单:
已经收到了一整个数据报
没有收到。

而TCP 这个概念就比较复杂,需要附加一些其他的变量
一个进程调用recvfrom ,然后系统调用并不返回知道有数据报到达本地系统,然后系统将数据拷贝到进程的缓存中。
(如果系统调用收到一个中断信号,则它的调用会被中断)我们称这个进程在调用recvfrom 一直到从recvfrom 返回这段时间是阻塞的。
当recvfrom正常返回时,我们的进程继续它的操作。

.非阻塞模式I/O
当我们将一个套接字设置为非阻塞模式,我们相当于告诉了系统内核:“当我请求的I/O 操作不能够马上完成,你想让我的进程进行休眠等待的时候,不要这么做,请马上返回一个错误给我。”

如我们开始对recvfrom 的三次调用,因为系统还没有接收到网络数据,所以内核马上返回一个EWOULDBLOCK的错误。
第四次我们调用recvfrom 函数,一个数据报已经到达了,内核将它拷贝到我们的应用程序的缓冲区中,然后recvfrom 正常返回,我们就可以对接收到的数据进行处理了。

当一个应用程序使用了非阻塞模式的套接字,它需要使用一个循环来不听的测试是否一个文件描述符有数据可读(称做polling)。
应用程序不停的polling 内核来检查是否I/O操作已经就绪。这将是一个极浪费CPU 资源的操作。这种模式使用中不是很普遍

.I/O 多路复用 select()
在使用I/O 多路技术的时候,我们调用select()函数和poll()函数,在调用它们的时候阻塞,而不是我们来调用recvfrom(或recv)的时候阻塞。
当我们调用select 函数阻塞的时候,select 函数等待数据报套接字进入读就绪状态。当select 函数返回的时候,也就是套接字可以读取数据的时候。这时候我们就可以调用recvfrom函数来将数据拷贝到我们的程序缓冲区中。
和阻塞模式相比较,select()和poll()并没有什么高级的地方,而且,在阻塞模式下只需要调用一个函数:读取或发送,在使用了多路复用技术后, 我们需要调用两个函数了:先调用select()函数或poll()函数,然后才能进行真正的读写。

多路复用的高级之处在于,它能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回

假设我们运行一个网络客户端程序,要同时处理套接字传来的网络数据又要处理本地的标准输入输出。在我们的程序处于阻塞状态等待标准输入的数据的时候,假如 服务器端的程序被kill(或是自己Down 掉了),那么服务器程端的TCP 协议会给客户端(我们这端)的TCP 协议发送一个FIN 数据代表终止连接。但是我们的程序阻塞在等待标准输入的数据上,在它读取套接字数据之前(也许是很长一段时间),它不会看见结束标志.我们就不能够使用阻 塞模式的套接字。

I/O多路技术一般在下面这些情况中被使用:
当一个客户端需要同时处理多个文件描述符的输入输出操作的时候(一般来说是标准的输入输出和网络套接字), I/O 多路复用技术将会有机会得到使用。
当程序需要同时进行多个套接字的操作的时候。
如果一个TCP 服务器程序同时处理正在侦听网络连接的套接字和已经连接好的套接字。
如果一个服务器程序同时使用TCP 和UDP 协议。
如果一个服务器同时使用多种服务并且每种服务可能使用不同的协议(比如inetd就是这样的)。

I/O 多路服用技术并不只局限与网络程序应用上。几乎所有的程序都可以找到应用I/O多路复用的地方。