查看源代码 Erlang I/O 协议
Erlang 中的 I/O 协议支持客户端和服务器之间的双向通信。
- I/O 服务器是一个进程,它处理请求并对例如 I/O 设备执行请求的任务。
- 客户端是任何希望从 I/O 设备读取或写入数据的 Erlang 进程。
通用的 I/O 协议从 OTP 的一开始就存在,但一直没有文档记录,并且多年来也在不断发展。在 Robert Virding 的原理附录中,描述了原始的 I/O 协议。本节描述了当前的 I/O 协议。
原始的 I/O 协议简单且灵活。对内存效率和执行时间效率的需求触发了多年来对协议的扩展,使协议比原始协议更大,并且实现起来有些困难。当然可以认为当前的协议过于复杂,但本节描述的是它现在的样子,而不是它应该是什么样子。
原始协议的基本思想仍然成立。I/O 服务器和客户端使用一个简单的协议进行通信,客户端中永远不存在服务器状态。任何 I/O 服务器都可以与任何客户端代码一起使用,并且客户端代码不需要知道 I/O 服务器与之通信的 I/O 设备。
协议基础
正如 Robert 的论文中所述,I/O 服务器和客户端使用 io_request
/io_reply
元组进行通信,如下所示
{io_request, From, ReplyAs, Request}
{io_reply, ReplyAs, Reply}
客户端向 I/O 服务器发送一个 io_request
元组,服务器最终会发送一个对应的 io_reply
元组。
From
是客户端的pid/0
,I/O 服务器向该进程发送 I/O 回复。ReplyAs
可以是任何数据,并在对应的io_reply
中返回。io
模块监视 I/O 服务器,并使用监视引用作为ReplyAs
数据。更复杂的客户端可能对同一 I/O 服务器有许多未完成的 I/O 请求,并且可以使用不同的引用(或其他内容)来区分传入的 I/O 回复。元素ReplyAs
被 I/O 服务器视为不透明。请注意,I/O 服务器的
pid/0
没有显式出现在元组io_reply
中。回复可以从任何进程发送,不一定是实际的 I/O 服务器。Request
和Reply
将在下面描述。
当 I/O 服务器收到一个 io_request
元组时,它会根据 Request
部分执行操作,并最终发送一个带有相应 Reply
部分的 io_reply
元组。
输出请求
要在 I/O 设备上输出字符,存在以下 Request
:
{put_chars, Encoding, Characters}
{put_chars, Encoding, Module, Function, Args}
Encoding
是unicode
或latin1
,这意味着字符(在二进制的情况下)编码为 UTF-8 或 ISO Latin-1(纯字节)。如果列表元素包含大于 255 的整数,并且Encoding
设置为latin1
,则行为良好的 I/O 服务器还会返回错误指示。请注意,这并不会以任何方式说明如何将字符放在 I/O 设备上或由 I/O 服务器处理。不同的 I/O 服务器可以按照它们希望的任何方式处理字符,这只是告诉 I/O 服务器期望数据具有哪种格式。在
Module
/Function
/Args
的情况下,Encoding
告诉指定函数产生哪种格式。另请注意,面向字节的数据最简单的方法是使用 ISO Latin-1 编码发送。
Characters
是要放置在 I/O 设备上的数据。如果Encoding
是latin1
,这是一个iolist/0
。如果Encoding
是unicode
,这是一个 Erlang 标准的混合 Unicode 列表(每个字符在一个列表中的整数,二进制文件中的字符表示为 UTF-8)。Module
、Function
和Args
表示一个被调用以产生数据的函数(例如io_lib:format/2
)。Args
是函数的参数列表。该函数用于生成指定Encoding
中的数据。I/O 服务器将以apply(Mod, Func, Args)
的方式调用该函数,并将返回的数据放置在 I/O 设备上,就像它是在{put_chars, Encoding, Characters}
请求中发送的一样。如果该函数返回除二进制或列表之外的任何内容,或抛出异常,则应将错误发送回客户端。
I/O 服务器使用 io_reply
元组回复客户端,其中元素 Reply
是以下之一:
ok
{error, Error}
Error
向客户端描述错误,客户端可以随意处理它。io
模块通常“按原样”返回它。
输入请求
要从 I/O 设备读取字符,存在以下 Request
:
{get_until, Encoding, Prompt, Module, Function, ExtraArgs}
Encoding
表示如何将数据发送回客户端,以及将哪些数据发送到由Module
/Function
/ExtraArgs
表示的函数。如果提供的函数以列表形式返回数据,则将数据转换为此编码。如果提供的函数以其他某种格式返回数据,则无法进行转换,并且由客户端提供的函数以正确的方式返回数据。如果
Encoding
是latin1
,则在可能的情况下,整数0..255
的列表或包含纯字节的二进制文件将发送回客户端。如果Encoding
是unicode
,则具有整个 Unicode 范围内的整数的列表或以 UTF-8 编码的二进制文件将发送到客户端。用户提供的函数始终看到整数列表,而不是二进制文件,但是如果Encoding
是unicode
,则该列表可以包含大于 255 的数字。Prompt
是字符列表(非混合,无二进制文件)或一个原子,将作为 I/O 设备上的输入提示输出。Prompt
通常被 I/O 服务器忽略;如果设置为''
,则始终被忽略(并且不会写入任何内容到 I/O 设备)。Module
、Function
和ExtraArgs
表示一个函数和用于确定何时写入足够数据的参数。该函数要接受两个附加参数,即最后的状态和字符列表。该函数要返回以下之一:{done, Result, RestChars} {more, Continuation}
Result
可以是任何 Erlang 项,但如果它是list/0
,则如果 I/O 服务器设置为二进制模式(请参见下文),则 I/O 服务器可以在将其返回到客户端之前将其转换为适当格式的binary/0
。使用 I/O 服务器在其 I/O 设备上找到的数据调用该函数,返回以下之一:
- 当读取足够的数据时,
{done, Result, RestChars}
。在这种情况下,Result
被发送到客户端,并且RestChars
保存在 I/O 服务器中,作为稍后输入的缓冲区。 {more, Continuation}
,表示需要更多字符才能完成请求。
当更多字符可用时,
Continuation
作为后续函数调用的状态发送。当没有更多字符可用时,该函数必须返回{done, eof, Rest}
。初始状态是空列表。当在 I/O 设备上到达文件末尾时,数据是原子eof
。可以使用以下函数(低效地)实现
get_line
请求的模拟:-module(demo). -export([until_newline/3, get_line/1]). until_newline(_ThisFar,eof,_MyStopCharacter) -> {done,eof,[]}; until_newline(ThisFar,CharList,MyStopCharacter) -> case lists:splitwith(fun(X) -> X =/= MyStopCharacter end, CharList) of {L,[]} -> {more,ThisFar++L}; {L2,[MyStopCharacter|Rest]} -> {done,ThisFar++L2++[MyStopCharacter],Rest} end. get_line(IoServer) -> IoServer ! {io_request, self(), IoServer, {get_until, unicode, '', ?MODULE, until_newline, [$\n]}}, receive {io_reply, IoServer, Data} -> Data end.
请注意,当调用函数时,
Request
元组 ([$\n]
) 中的最后一个元素会附加到参数列表中。I/O 服务器要像apply(Module, Function, [ State, Data | ExtraArgs ])
一样调用该函数。- 当读取足够的数据时,
使用以下 Request
请求固定数量的字符:
{get_chars, Encoding, Prompt, N}
Encoding
和Prompt
与get_until
相同。N
是要从 I/O 设备读取的字符数。
使用以下 Request
请求单行(如前面的示例):
{get_line, Encoding, Prompt}
Encoding
和Prompt
与get_until
相同。
显然,可以使用 get_until
请求来实现 get_chars
和 get_line
(实际上它们最初就是这样实现的),但是对效率的需求使得这些添加成为必要。
I/O 服务器使用 io_reply
元组回复客户端,其中元素 Reply
是以下之一:
Data
eof
{error, Error}
Data
是读取的字符,以列表或二进制形式(取决于 I/O 服务器模式,请参见下一节)。- 当输入结束并且客户端进程没有更多可用数据时,将返回
eof
。 Error
向客户端描述错误,客户端可以随意处理它。io
模块通常按原样返回它。
I/O 服务器模式
从 I/O 服务器读取数据时对效率的需求不仅导致添加了 get_line
和 get_chars
请求,而且还添加了 I/O 服务器选项的概念。没有强制实施的选项,但是 Erlang 标准库中的所有 I/O 服务器都支持 binary
选项,该选项允许 io_reply
元组的元素 Data
在可能的情况下是二进制文件而不是列表。如果数据以二进制形式发送,则 Unicode 数据以标准 Erlang Unicode 格式(即 UTF-8)发送(请注意,get_until
请求的函数仍然获取列表数据,而不管 I/O 服务器模式如何)。
请注意,get_until
请求允许数据始终指定为列表的函数。此外,此类函数的返回值数据可以是任何类型(当 io:fread/2,3
请求发送到 I/O 服务器时,确实如此)。客户端必须为接收到的作为对这些请求的响应的数据做好各种形式的准备。但是,I/O 服务器要在可能的情况下(即,当提供给 get_until
的函数返回列表时)将结果转换为二进制文件。在 带注释的工作 I/O 服务器示例 部分的示例中进行了此操作。
二进制模式下的 I/O 服务器会影响发送到客户端的数据,因此它必须能够处理二进制数据。为方便起见,可以使用以下 I/O 请求设置和检索 I/O 服务器的模式:
{setopts, Opts}
Opts
是proplists
模块(和 I/O 服务器)识别的格式的选项列表。
例如,交互式 shell 的 I/O 服务器(位于 group.erl
中)可以理解以下选项
{binary, boolean()} (or binary/list)
{echo, boolean()}
{expand_fun, fun()}
{encoding, unicode/latin1} (or unicode/latin1)
选项 binary
和 encoding
是 OTP 中所有 I/O 服务器通用的,而 echo
和 expand
仅对此 I/O 服务器有效。选项 unicode
通知如何将字符放置在物理 I/O 设备上,也就是说,终端本身是否支持 Unicode。它不会影响在 I/O 协议中发送字符的方式,其中每个请求都包含所提供或返回数据的编码信息。
I/O 服务器应将以下内容之一作为 Reply
发送
ok
{error, Error}
如果 I/O 服务器不支持该选项(例如,如果向普通文件发送 setopts
请求中的 echo
选项),则应预期会出现错误(最好是 enotsup
)。
要检索选项,请使用以下请求
getopts
此请求会请求 I/O 服务器支持的所有选项的完整列表以及它们的当前值。
I/O 服务器回复
OptList
{error, Error}
OptList
是一个元组{Option, Value}
的列表,其中Option
始终是原子。
多个 I/O 请求
通过使用以下格式,Request
元素本身可以包含多个 Request
{requests, Requests}
Requests
是协议的有效io_request
元组的列表。它们必须按照在列表中出现的顺序执行。执行将继续,直到其中一个请求导致错误或列表被消耗完毕。最后一个请求的结果将发送回客户端。
对于请求列表,I/O 服务器可以在回复中发送以下任何有效结果,具体取决于列表中的请求
ok
{ok, Data}
{ok, Options}
{error, Error}
可选 I/O 请求
以下 I/O 请求是可选的,客户端应准备好接收错误返回
{get_geometry, Geometry}
Geometry
是原子rows
或原子columns
。
I/O 服务器应将以下内容之一作为 Reply
发送
N
{error, Error}
N
是 I/O 设备具有的字符行数或列数,如果适用于 I/O 服务器处理的 I/O 设备,否则{error, enotsup}
是一个很好的答案。
未实现的请求类型
如果 I/O 服务器遇到它无法识别的请求(也就是说,io_request
元组具有预期的格式,但 Request
是未知的),则 I/O 服务器应发送带有错误元组的有效回复
{error, request}
这使得可以使用可选请求来扩展协议,并使客户端在某种程度上向后兼容。
带注释的工作示例 I/O 服务器
I/O 服务器是任何能够处理 I/O 协议的进程。没有通用的 I/O 服务器行为,但可以有。该框架很简单,一个进程处理传入的请求,通常既有 I/O 请求,也有其他特定于 I/O 设备的请求(定位、关闭等)。
示例 I/O 服务器将字符存储在 ETS 表中,组成一个相当粗糙的 RAM 文件。
该模块以通常的指令、启动 I/O 服务器的函数和处理请求的主循环开始
-module(ets_io_server).
-export([start_link/0, init/0, loop/1, until_newline/3, until_enough/3]).
-define(CHARS_PER_REC, 10).
-record(state, {
table,
position, % absolute
mode % binary | list
}).
start_link() ->
spawn_link(?MODULE,init,[]).
init() ->
Table = ets:new(noname,[ordered_set]),
?MODULE:loop(#state{table = Table, position = 0, mode=list}).
loop(State) ->
receive
{io_request, From, ReplyAs, Request} ->
case request(Request,State) of
{Tag, Reply, NewState} when Tag =:= ok; Tag =:= error ->
reply(From, ReplyAs, Reply),
?MODULE:loop(NewState);
{stop, Reply, _NewState} ->
reply(From, ReplyAs, Reply),
exit(Reply)
end;
%% Private message
{From, rewind} ->
From ! {self(), ok},
?MODULE:loop(State#state{position = 0});
_Unknown ->
?MODULE:loop(State)
end.
主循环接收来自客户端的消息(客户端可以使用 io
模块发送请求)。对于每个请求,调用函数 request/2
,并最终使用函数 reply/3
发送回复。
“私有”消息 {From, rewind}
会导致伪文件中的当前位置重置为 0
(“文件”的开头)。这是一个 I/O 设备特定消息不属于 I/O 协议的典型示例。通常,将此类私有消息嵌入 io_request
元组中不是一个好主意,因为这会使读者感到困惑。
首先,我们检查回复函数
reply(From, ReplyAs, Reply) ->
From ! {io_reply, ReplyAs, Reply}.
它将 io_reply
元组发送回客户端,提供请求中收到的元素 ReplyAs
以及请求的结果,如前所述。
我们需要处理一些请求。首先是写入字符的请求
request({put_chars, Encoding, Chars}, State) ->
put_chars(unicode:characters_to_list(Chars,Encoding),State);
request({put_chars, Encoding, Module, Function, Args}, State) ->
try
request({put_chars, Encoding, apply(Module, Function, Args)}, State)
catch
_:_ ->
{error, {error,Function}, State}
end;
Encoding
说明请求中的字符是如何表示的。我们想将字符作为列表存储在 ETS 表中,因此我们使用函数 unicode:characters_to_list/2
将它们转换为列表。转换函数方便地接受编码类型 unicode
和 latin1
,因此我们可以直接使用 Encoding
。
当提供 Module
、Function
和 Arguments
时,我们应用它,并对结果执行相同的操作,就像直接提供数据一样。
我们处理检索数据的请求
request({get_until, Encoding, _Prompt, M, F, As}, State) ->
get_until(Encoding, M, F, As, State);
request({get_chars, Encoding, _Prompt, N}, State) ->
%% To simplify the code, get_chars is implemented using get_until
get_until(Encoding, ?MODULE, until_enough, [N], State);
request({get_line, Encoding, _Prompt}, State) ->
%% To simplify the code, get_line is implemented using get_until
get_until(Encoding, ?MODULE, until_newline, [$\n], State);
这里我们作弊了一点,或多或少只实现了 get_until
,并使用内部辅助函数来实现 get_chars
和 get_line
。在生产代码中,这可能效率低下,但这取决于不同请求的频率。在我们开始实现函数 put_chars/2
和 get_until/5
之前,我们检查剩余的几个请求
request({get_geometry,_}, State) ->
{error, {error,enotsup}, State};
request({setopts, Opts}, State) ->
setopts(Opts, State);
request(getopts, State) ->
getopts(State);
request({requests, Reqs}, State) ->
multi_request(Reqs, {ok, ok, State});
请求 get_geometry
对于此 I/O 服务器没有意义,因此回复是 {error, enotsup}
。我们处理的唯一选项是 binary
/list
,这在单独的函数中完成。
多请求标签(requests
)在一个单独的循环函数中处理,该函数将列表中的请求一个接一个地应用,返回最后一个结果。
如果请求无法识别,则必须返回 {error, request}
request(_Other, State) ->
{error, {error, request}, State}.
接下来,我们处理不同的请求,首先是相当通用的多请求类型
multi_request([R|Rs], {ok, _Res, State}) ->
multi_request(Rs, request(R, State));
multi_request([_|_], Error) ->
Error;
multi_request([], Result) ->
Result.
我们一次循环遍历一个请求,当遇到错误或列表耗尽时停止。最后一个返回值将发送回客户端(它首先返回到主循环,然后由函数 io_reply
发送回)。
请求 getopts
和 setopts
也易于处理。我们只更改或读取状态记录
setopts(Opts0,State) ->
Opts = proplists:unfold(
proplists:substitute_negations(
[{list,binary}],
Opts0)),
case check_valid_opts(Opts) of
true ->
case proplists:get_value(binary, Opts) of
true ->
{ok,ok,State#state{mode=binary}};
false ->
{ok,ok,State#state{mode=binary}};
_ ->
{ok,ok,State}
end;
false ->
{error,{error,enotsup},State}
end.
check_valid_opts([]) ->
true;
check_valid_opts([{binary,Bool}|T]) when is_boolean(Bool) ->
check_valid_opts(T);
check_valid_opts(_) ->
false.
getopts(#state{mode=M} = S) ->
{ok,[{binary, case M of
binary ->
true;
_ ->
false
end}],S}.
按照惯例,所有 I/O 服务器都处理 {setopts, [binary]}
、{setopts, [list]}
和 {setopts,[{binary, boolean()}]}
,因此可以使用 proplists:substitute_negations/2
和 proplists:unfold/1
的技巧。如果发送给我们无效的选项,我们会向客户端发送 {error, enotsup}
。
请求 getopts
将返回一个 {Option, Value}
元组的列表。这具有双重功能,既提供当前值,又提供此 I/O 服务器的可用选项。我们只有一个选项,因此返回它。
到目前为止,此 I/O 服务器相当通用(除了主循环中处理的请求 rewind
和 ETS 表的创建)。大多数 I/O 服务器都包含与此类似的代码。
为了使示例可运行,我们开始实现从 ETS 表中读取和写入数据。首先是函数 put_chars/3
put_chars(Chars, #state{table = T, position = P} = State) ->
R = P div ?CHARS_PER_REC,
C = P rem ?CHARS_PER_REC,
[ apply_update(T,U) || U <- split_data(Chars, R, C) ],
{ok, ok, State#state{position = (P + length(Chars))}}.
我们已经有了 (Unicode) 列表形式的数据,因此只需将列表拆分为预定义大小的运行,并将每个运行置于当前位置(并向前)。函数 split_data/3
和 apply_update/2
在下面实现。
现在我们要从表中读取数据。函数 get_until/5
读取数据并应用该函数,直到它表示已完成。结果将发送回客户端
get_until(Encoding, Mod, Func, As,
#state{position = P, mode = M, table = T} = State) ->
case get_loop(Mod,Func,As,T,P,[]) of
{done,Data,_,NewP} when is_binary(Data); is_list(Data) ->
if
M =:= binary ->
{ok,
unicode:characters_to_binary(Data, unicode, Encoding),
State#state{position = NewP}};
true ->
case check(Encoding,
unicode:characters_to_list(Data, unicode))
of
{error, _} = E ->
{error, E, State};
List ->
{ok, List,
State#state{position = NewP}}
end
end;
{done,Data,_,NewP} ->
{ok, Data, State#state{position = NewP}};
Error ->
{error, Error, State}
end.
get_loop(M,F,A,T,P,C) ->
{NewP,L} = get(P,T),
case catch apply(M,F,[C,L|A]) of
{done, List, Rest} ->
{done, List, [], NewP - length(Rest)};
{more, NewC} ->
get_loop(M,F,A,T,NewP,NewC);
_ ->
{error,F}
end.
在这里,我们还处理可以通过请求 setopts
设置的模式(binary
或 list
)。默认情况下,所有 OTP I/O 服务器都会将数据作为列表发送回客户端,但如果 I/O 服务器以适当的方式处理,则将模式切换为 binary
可以提高效率。 get_until
的实现很难高效,因为提供的函数被定义为接受列表作为参数,但是可以为二进制模式优化 get_chars
和 get_line
。但是,此示例不会优化任何内容。
但重要的是,返回的数据类型要正确,具体取决于设置的选项。因此,我们在返回之前,如果可能,将列表转换为正确的编码中的二进制文件。在 get_until
请求元组中提供的函数可以将其最终结果返回任何内容,因此只有返回列表的函数才能将其转换为二进制文件。如果请求包含编码标签 unicode
,则列表可以包含所有 Unicode 代码点,并且二进制文件应为 UTF-8。如果编码标签为 latin1
,则客户端只能获取范围 0..255
中的字符。如果编码指定为 latin1
,则函数 check/2
负责不返回列表中任意的 Unicode 代码点。如果函数不返回列表,则无法执行检查,结果是不受影响的提供函数的结果。
为了操作表格,我们实现以下实用函数
check(unicode, List) ->
List;
check(latin1, List) ->
try
[ throw(not_unicode) || X <- List,
X > 255 ],
List
catch
throw:_ ->
{error,{cannot_convert, unicode, latin1}}
end.
如果客户端请求 latin1
,则如果返回的 Unicode 代码点 > 255,则函数 check 将提供错误元组。
两个函数 until_newline/3
和 until_enough/3
是与函数 get_until/5
一起使用的辅助函数,用于(低效地)实现 get_chars
和 get_line
until_newline([],eof,_MyStopCharacter) ->
{done,eof,[]};
until_newline(ThisFar,eof,_MyStopCharacter) ->
{done,ThisFar,[]};
until_newline(ThisFar,CharList,MyStopCharacter) ->
case
lists:splitwith(fun(X) -> X =/= MyStopCharacter end, CharList)
of
{L,[]} ->
{more,ThisFar++L};
{L2,[MyStopCharacter|Rest]} ->
{done,ThisFar++L2++[MyStopCharacter],Rest}
end.
until_enough([],eof,_N) ->
{done,eof,[]};
until_enough(ThisFar,eof,_N) ->
{done,ThisFar,[]};
until_enough(ThisFar,CharList,N)
when length(ThisFar) + length(CharList) >= N ->
{Res,Rest} = my_split(N,ThisFar ++ CharList, []),
{done,Res,Rest};
until_enough(ThisFar,CharList,_N) ->
{more,ThisFar++CharList}.
可以看出,上面的函数正是要在 get_until
请求中提供的函数类型。
为了完成 I/O 服务器,我们只需要以适当的方式读取和写入表格
get(P,Tab) ->
R = P div ?CHARS_PER_REC,
C = P rem ?CHARS_PER_REC,
case ets:lookup(Tab,R) of
[] ->
{P,eof};
[{R,List}] ->
case my_split(C,List,[]) of
{_,[]} ->
{P+length(List),eof};
{_,Data} ->
{P+length(Data),Data}
end
end.
my_split(0,Left,Acc) ->
{lists:reverse(Acc),Left};
my_split(_,[],Acc) ->
{lists:reverse(Acc),[]};
my_split(N,[H|T],Acc) ->
my_split(N-1,T,[H|Acc]).
split_data([],_,_) ->
[];
split_data(Chars, Row, Col) ->
{This,Left} = my_split(?CHARS_PER_REC - Col, Chars, []),
[ {Row, Col, This} | split_data(Left, Row + 1, 0) ].
apply_update(Table, {Row, Col, List}) ->
case ets:lookup(Table,Row) of
[] ->
ets:insert(Table,{Row, lists:duplicate(Col,0) ++ List});
[{Row, OldData}] ->
{Part1,_} = my_split(Col,OldData,[]),
{_,Part2} = my_split(Col+length(List),OldData,[]),
ets:insert(Table,{Row, Part1 ++ List ++ Part2})
end.
表格以 ?CHARS_PER_REC
的块读取或写入,并在必要时覆盖。该实现显然效率不高,它只是在工作。
这结束了示例。它是完全可运行的,您可以通过使用例如 io
模块甚至 file
模块来读取或写入 I/O 服务器。在 Erlang 中实现一个功能齐全的 I/O 服务器就是这么简单。