查看源码 套接字使用

简介

套接字接口(模块)基本上是操作系统套接字接口之上的一个“薄”层。假设除非您有特殊需求,否则 gen_[tcp|udp|sctp] 应该足够(当它们可用时)。

请注意,仅仅因为我们有一个已记录和描述的选项,并不意味着操作系统支持它。因此,建议用户阅读所使用选项的特定于平台的文档。

异步调用

某些函数允许异步调用(accept/2connect/3recv/3,4recvfrom/3,4recvmsg/2,3,5send/3,4sendmsg/3,4sendto/4,5)。这是通过将 Timeout 参数设置为 nowait 来实现的。例如,如果使用设置为 nowait 的 Timeout 调用 recv/3 函数(即 recv(Sock, 0, nowait)),而实际上没有任何内容可读,它将返回

当数据最终到达时,将向调用者发送 'select' 或 'completion' 消息

  • 在 Unix 上 - {'$socket', socket(), select, SelectHandle}

    然后,调用者可以再次调用 recv 函数,并期望现在有数据。

    请注意,所有其他用户都将被锁定,直到“当前用户”调用该函数(本例中为 recv)。因此,要么立即调用该函数,要么使用 cancel 取消。

  • 在 Windows 上 - {'$socket', socket(), completion, {CompletionHandle, CompletionStatus}}

    CompletionStatus 包含操作(读取)的结果。

用户还必须准备好接收中止消息

  • {'$socket', socket(), abort, Info}

如果操作因任何原因中止(例如,如果套接字被“其他人”关闭)。Info 部分包含中止原因(在这种情况下,套接字已关闭 Info = {SelectHandle, closed})。

'socket' 消息的通用形式是

  • {'$socket', Sock :: socket(), Tag :: atom(), Info :: term()}

其中 Info 的格式是 Tag 的函数

标记信息值类型
选择select_handle()
完成{completion_handle(), CompletionStatus}
中止{select_handle(), Reason :: term()}

表格:套接字消息信息值类型

select_handle()SelectInfo 中返回的相同。

completion_handle()CompletionInfo 中返回的相同。

套接字注册表

套接字注册表是我们跟踪套接字的方式。可以使用两个函数进行交互:socket:number_of/0socket:which_sockets/1

在动态创建和删除许多套接字的系统中,它(套接字注册表)可能会成为瓶颈。对于此类系统,有几种方法可以控制套接字注册表的使用。

首先,可以使用两个配置选项,在从源代码构建 OTP 时影响全局默认值

--enable-esock-socket-registry (default) | --disable-esock-socket-registry

其次,可以通过在启动 erlang 之前设置环境变量 ESOCK_USE_SOCKET_REGISTRY(布尔值)来影响全局默认值。

第三,可以通过调用函数 use_registry/1 在运行时更改全局默认值。

最后,可以在创建套接字时(使用 open/2open/4)通过在其 Opts 参数中提供属性 use_registry(布尔值)来覆盖全局默认值(这会影响特定套接字)。

示例

此示例旨在展示如何创建简单的(回显)服务器(和客户端)。

-module(example).

-export([client/2, client/3]).
-export([server/0, server/1, server/2]).


%% ======================================================================

%% === Client ===

client(#{family := Family} = ServerSockAddr, Msg)
  when is_list(Msg) orelse is_binary(Msg) ->
    {ok, Sock} = socket:open(Family, stream, default),
    ok         = maybe_bind(Sock, Family),
    ok         = socket:connect(Sock, ServerSockAddr),
    client_exchange(Sock, Msg);

client(ServerPort, Msg)
  when is_integer(ServerPort) andalso (ServerPort > 0) ->
    Family   = inet, % Default
    Addr     = get_local_addr(Family), % Pick an address
    SockAddr = #{family => Family,
		 addr   => Addr,
		 port   => ServerPort},
    client(SockAddr, Msg).

client(ServerPort, ServerAddr, Msg)
  when is_integer(ServerPort) andalso (ServerPort > 0) andalso
       is_tuple(ServerAddr) ->
    Family   = which_family(ServerAddr),
    SockAddr = #{family => Family,
		 addr   => ServerAddr,
		 port   => ServerPort},
    client(SockAddr, Msg).

%% Send the message to the (echo) server and wait for the echo to come back.
client_exchange(Sock, Msg) when is_list(Msg) ->
    client_exchange(Sock, list_to_binary(Msg));
client_exchange(Sock, Msg) when is_binary(Msg) ->
    ok = socket:send(Sock, Msg, infinity),
    {ok, Msg} = socket:recv(Sock, byte_size(Msg), infinity),
    ok.


%% ======================================================================

%% === Server ===

server() ->
    %% Make system choose port (and address)
    server(0).

%% This function return the port and address that it actually uses,
%% in case server/0 or server/1 (with a port number) was used to start it.

server(#{family := Family, addr := Addr, port := _} = SockAddr) ->
    {ok, Sock} = socket:open(Family, stream, tcp),
    ok         = socket:bind(Sock, SockAddr),
    ok         = socket:listen(Sock),
    {ok, #{port := Port}} = socket:sockname(Sock),
    Acceptor = start_acceptor(Sock),
    {ok, {Port, Addr, Acceptor}};

server(Port) when is_integer(Port) ->
    Family   = inet, % Default
    Addr     = get_local_addr(Family), % Pick an address
    SockAddr = #{family => Family,
		 addr   => Addr,
		 port   => Port},
    server(SockAddr).

server(Port, Addr)
  when is_integer(Port) andalso (Port >= 0) andalso
       is_tuple(Addr) ->
    Family   = which_family(Addr),
    SockAddr = #{family => Family,
		 addr   => Addr,
		 port   => Port},
    server(SockAddr).


%% --- Echo Server - Acceptor ---

start_acceptor(LSock) ->
    Self = self(),
    {Pid, MRef} = spawn_monitor(fun() -> acceptor_init(Self, LSock) end),
    receive
	{'DOWN', MRef, process, Pid, Info} ->
	    erlang:error({failed_starting_acceptor, Info});
	{Pid, started} ->
	    %% Transfer ownership
	    socket:setopt(LSock, otp, owner, Pid),
	    Pid ! {self(), continue},
	    erlang:demonitor(MRef),
	    Pid
    end.
    
acceptor_init(Parent, LSock) ->
    Parent ! {self(), started},
    receive
	{Parent, continue} ->
	    ok
    end,
    acceptor_loop(LSock).

acceptor_loop(LSock) ->
    case socket:accept(LSock, infinity) of
	{ok, ASock} ->
	    start_handler(ASock),
	    acceptor_loop(LSock);
	{error, Reason} ->
	    erlang:error({accept_failed, Reason})
    end.


%% --- Echo Server - Handler ---

start_handler(Sock) ->
    Self = self(),
    {Pid, MRef} = spawn_monitor(fun() -> handler_init(Self, Sock) end),
    receive
	{'DOWN', MRef, process, Pid, Info} ->
	    erlang:error({failed_starting_handler, Info});
	{Pid, started} ->
	    %% Transfer ownership
	    socket:setopt(Sock, otp, owner, Pid),
	    Pid ! {self(), continue},
	    erlang:demonitor(MRef),
	    Pid
    end.

handler_init(Parent, Sock) ->
    Parent ! {self(), started},
    receive
	{Parent, continue} ->
	    ok
    end,
    handler_loop(Sock, undefined).

%% No "ongoing" reads
%% The use of 'nowait' here is clearly *overkill* for this use case,
%% but is intended as an example of how to use it.
handler_loop(Sock, undefined) ->
    case socket:recv(Sock, 0, nowait) of
	{ok, Data} ->
	    echo(Sock, Data),
	    handler_loop(Sock, undefined);

	{select, SelectInfo} ->
	    handler_loop(Sock, SelectInfo);

	{completion, CompletionInfo} ->
	    handler_loop(Sock, CompletionInfo);

	{error, Reason} ->
	    erlang:error({recv_failed, Reason})
    end;

%% This is the standard (asyncronous) behaviour.
handler_loop(Sock, {select_info, recv, SelectHandle}) ->
    receive
	{'$socket', Sock, select, SelectHandle} ->
	    case socket:recv(Sock, 0, nowait) of
		{ok, Data} ->
		    echo(Sock, Data),
		    handler_loop(Sock, undefined);

		{select, NewSelectInfo} ->
		    handler_loop(Sock, NewSelectInfo);

		{error, Reason} ->
		    erlang:error({recv_failed, Reason})
	    end
    end;

%% This is the (asyncronous) behaviour on platforms that support 'completion',
%% currently only Windows.
handler_loop(Sock, {completion_info, recv, CompletionHandle}) ->
    receive
	{'$socket', Sock, completion, {CompletionHandle, CompletionStatus}} ->
	    case CompletionStatus of
		{ok, Data} ->
		    echo(Sock, Data),
		    handler_loop(Sock, undefined);
		{error, Reason} ->
		    erlang:error({recv_failed, Reason})
	    end
    end.

echo(Sock, Data) when is_binary(Data) ->
    ok = socket:send(Sock, Data, infinity),
    io:format("** ECHO **"
	      "~n~s~n", [binary_to_list(Data)]).


%% ======================================================================

%% === Utility functions ===

maybe_bind(Sock, Family) ->
    maybe_bind(Sock, Family, os:type()).

maybe_bind(Sock, Family, {win32, _}) ->
    Addr     = get_local_addr(Family),
    SockAddr = #{family => Family,
                 addr   => Addr,
                 port   => 0},
    socket:bind(Sock, SockAddr);
maybe_bind(_Sock, _Family, _OS) ->
    ok.

%% The idea with this is extract a "usable" local address
%% that can be used even from *another* host. And doing
%% so using the net module.

get_local_addr(Family) ->
    Filter =
	fun(#{addr  := #{family := Fam},
	      flags := Flags}) ->
		(Fam =:= Family) andalso (not lists:member(loopback, Flags));
	   (_) ->
		false
	end,
    {ok, [SockAddr|_]} = net:getifaddrs(Filter),
    #{addr := #{addr := Addr}} = SockAddr,
    Addr.

which_family(Addr) when is_tuple(Addr) andalso (tuple_size(Addr) =:= 4) ->
    inet;
which_family(Addr) when is_tuple(Addr) andalso (tuple_size(Addr) =:= 8) ->
    inet6.

套接字选项

级别 otp 的选项

选项名称值类型设置获取其他要求和注释
assoc_idinteger()type = seqpacket,protocol = sctp,是一个关联
调试boolean()
iowboolean()
controlling_processpid()
rcvbufdefault | pos_integer() | {pos_integer(), pos_ineteger()}元组格式在 Windows 上不允许。'default' 仅对设置有效。元组形式仅对类型 'stream' 和协议 'tcp' 有效。
rcvctrlbufdefault | pos_integer()default 仅对设置有效
sndctrlbufdefault | pos_integer()default 仅对设置有效
fdinteger()
use_registryboolean()该值在创建套接字时通过调用 open/2open/4 设置。

表格:选项级别

级别 socket 的选项

选项名称值类型设置获取其他要求和注释
acceptconnboolean()
bindtodevicestring()在 Linux 3.8 之前,可以设置此套接字选项,但无法获取。仅适用于某些套接字类型(例如,inet)。如果设置为空值,则会删除绑定。
broadcastboolean()type = dgram
bsp_statemap()仅限 Windows
调试integer()可能需要管理员权限
domaindomain()例如,在 FreeBSD 上适用
dontrouteboolean()
exclusiveaddruseboolean()仅限 Windows
keepaliveboolean()
lingerabort | linger()
maxdginteger()仅限 Windows
max_msg_sizeinteger()仅限 Windows
oobinlineboolean()
peek_offinteger()domain = local (unix)。目前由于第二次调用 recv([peek]) 时可能出现的无限循环而被禁用。
priorityinteger()
protocolprotocol()例如,在(某些)Darwin 上适用
rcvbufnon_neg_integer()
rcvlowatnon_neg_integer()
rcvtimeotimeval()此选项通常不受支持(请参阅下面的原因)。OTP 必须使用 --enable-esock-rcvsndtime 配置选项显式构建才能使用。由于我们的实现是非阻塞的,因此不清楚此选项是否有效以及如何工作,甚至是否可能导致故障。因此,我们不建议设置此选项。相反,请使用 Timeout 参数,例如 recv/3 函数。
reuseaddrboolean()
reuseportboolean()domain = inet | inet6
sndbufnon_neg_integer()
sndlowatnon_neg_integer()在 Linux 上不可更改
sndtimeotimeval()此选项通常不受支持(请参阅下面的原因)。OTP 必须使用 --enable-esock-rcvsndtime 配置选项显式构建才能使用。由于我们的实现是非阻塞的,因此不清楚此选项是否有效以及如何工作,甚至是否可能导致故障。因此,我们不建议设置此选项。相反,请使用 Timeout 参数,例如 send/3 函数。
timestampboolean()
typetype()

表格:套接字选项

级别 ip 的选项

选项名称值类型设置获取其他要求和注释
add_membershipip_mreq()
add_source_membershipip_mreq_source()
block_sourceip_mreq_source()
drop_membershipip_mreq()
drop_source_membershipip_mreq_source()
freebindboolean()
hdrinclboolean()type = raw
minttlinteger()type = raw
msfilternull | ip_msfilter()
mtuinteger()type = raw
mtu_discoverip_pmtudisc()
multicast_allboolean()
multicast_ifany | ip4_address()
multicast_loopboolean()
multicast_ttluint8()
nodefragboolean()type = raw
pktinfoboolean()type = dgram
recvdstaddrboolean()type = dgram
recverrboolean()
recvifboolean()type = dgram | raw
recvoptsboolean()type =/= stream
recvorigdstaddrboolean()
recvttlboolean()type =/= stream
retoptsboolean()type =/= stream
router_alertinteger()type = raw
sendsrcaddrboolean()
tosip_tos()某些高优先级级别可能需要超级用户权限
transparentboolean()需要管理员权限
ttlinteger()
unblock_sourceip_mreq_source()

表格:ip 选项

级别 ipv6 的选项

选项名称值类型设置获取其他要求和注释
addrforminet仅允许用于已连接并绑定到 v4 映射到 v6 地址的 IPv6 套接字
add_membershipipv6_mreq()
authhdrboolean()type = dgram | raw,已过时?
drop_membershipipv6_mreq()
dstoptsboolean()type = dgram | raw,需要超级用户权限才能更新
flowinfoboolean()type = dgram | raw,需要超级用户权限才能更新
hoplimitboolean()type = dgram | raw。在某些平台(例如 FreeBSD)上,用于设置以获取 hoplimit 作为控制消息标头。在其他平台(例如 Linux)上,设置 recvhoplimit 以获取 hoplimit
hopoptsboolean()type = dgram | raw,需要超级用户权限才能更新
mtuboolean()获取:仅在套接字连接后
mtu_discoveripv6_pmtudisc()
multicast_hopsdefault | uint8()
multicast_ifinteger()type = dgram | raw
multicast_loopboolean()
recverrboolean()
recvhoplimitboolean()类型 = dgram | raw。在某些平台(例如 Linux)上,设置 recvhoplimit 以获取 hoplimit
recvpktinfo | pktinfoboolean()type = dgram | raw。在某些平台(例如 FreeBSD)上,用于设置以获取 hoplimit 作为控制消息标头。在其他平台(例如 Linux)上,设置 recvhoplimit 以获取 hoplimit
recvtclassboolean()类型 = dgram | raw。在某些平台上,设置为 (=true) 以获取 tclass 控制消息头。在其他平台上,设置 tclass 以获取 tclass 控制消息头。
router_alertinteger()type = raw
rthdrboolean()type = dgram | raw,需要超级用户权限才能更新
tclassinteger()设置与传出数据包关联的流量类别。RFC3542。
unicast_hopsdefault | uint8()
v6onlyboolean()

表:ipv6 选项

级别 tcp 的选项

选项名称值类型设置获取其他要求和注释
congestionstring()
corkboolean()在某些平台(FreeBSD)上为 'nopush'
keepcntinteger()在 Windows(至少)上,设置为大于 255 的值是非法的。
keepidleinteger()
keepintvlinteger()
maxseginteger()并非所有平台都允许设置。
nodelayboolean()
nopushboolean()在某些平台(Linux)上为 'cork'。在 Darwin 上,它的含义与 FreeBSD 等平台不同。

表:tcp 选项

级别 udp 的选项

选项名称值类型设置获取其他要求和注释
corkboolean()

表:udp 选项

级别 sctp 的选项

选项名称值类型设置获取其他要求和注释
associnfosctp_assocparams()
autoclosenon_neg_integer()
disable_fragmentsboolean()
eventssctp_event_subscribe()
initmsgsctp_initmsg()
maxsegnon_neg_integer()
nodelayboolean()
rtoinfosctp_rtoinfo()

表:sctp 选项