Распределенное программирование в Erlang
В Эрланге:
создание и уничтожение процессов очень быстрое;
посылка сообщений между процессами очень быстрая;
процессы ведут себя одинаково во всех операционных системах;
может быть очень большое количество процессов;
процессы не разделяют память и являются полностью независимыми;
единственный способ для взаимодействия процессов — это через передачу сообщений.
По этим причинам Эрланг иногда называют языком с чистой передачей сообщений.
В Эрланге программировать процессы легко. Для этого нужно только три примитива: spawn , send, receive .
Pid = spawn(Fun)
Создаёт новый параллельный процесс, который вычисляет (evaluates) Fun . Новый
процесс работает параллельно с вызвавшим его. Spawn возвращает Pid (сокращение
для идентификатор процесса). Вы можете использовать Pid для посылки сообщений
процессу.
Pid ! Message
Посылает сообщение Message процессу с идентификатором Pid . Посылка сообщения
асинхронна. Отправитель не ждёт, а продолжает делать то, чем занимался. Восклицательный знак !
еще называется оператором send .
Pid ! M определяется как M — примитив отправки сообщения ! возвращает само
сообщение. Поэтому Pid1 ! Pid2 ! ... ! M означает отправку сообщения M всем
процессам — Pid1 , Pid2 и т. д.
receive ... end
Принимает сообщение, которое было послано процессу. У него следующий синтаксис:
receive
Pattern1 [when Guard1] ->
Expressions1;
Pattern2 [when Guard2] ->
Expressions2;
...
end
Когда сообщение прибывает к процессу система пытается сопоставить его с образцом
Pattern1 (возможно с учётом условия Guard1 ). Если это выполнилось успешно, то
она вычисляет выражение Expression1 . Если первый образец не совпадает, то она
использует Pattern2 и т.д. Если ни один из образцов не соответствует, сообщение
сохраняется для последующей обработки, а процесс ожидает следующего сообщения.
Напишем небольшую программу, которая создает процесс, в котором вычисляется площадь прямоугольника
и круга, и пошлем этому процессу три сообщения:
-module(area_server0).
-compile([export_all]).
loop() ->
receive
{rectangle, Width, Ht} ->
io:format("Area of rectangle is ~p~n",[Width * Ht]),
loop();
{circle, R} ->
io:format("Area of circle is ~p~n", [3.14159 * R * R]),
loop();
Other ->
io:format("I don't know what the area of a ~p is ~n",[Other]),
loop()
end.
main() ->
Pid = spawn(fun area_server0:loop/0),
Pid ! {rectangle, 6, 10},
Pid ! {circle, 23},
Pid ! {triangle,2,4,5},
io:format("start ~n",[]).
Собрать и запустить такую программу можно двумя командами:
erlc area_server0.erl
erl -noshell -s area_server0 main -s init stop
Архитектуры клиент-сервер центральные в Эрланге.
Клиент и сервер в клиент-серверной архитектуре — это раздельные процессы, и для связи
между клиентом и сервером используется обычная передача сообщений Эрланга. Как
клиент, так и сервер могут работать на одной и той же машине или на двух разных машинах.
Слова клиент и сервер ссылаются на роли, которые выполняют эти два процесса.
Клиент всегда начинает вычисление отправляя запрос к серверу. Сервер вычисляет
ответ и отправляет отзыв клиенту.
Теперь мы слегка переделаем предыдущую программу.
Нам нужно послать ответ процессу, который послал первоначальный запрос.
Отправитель должен включить обратный адрес в сообщение. Этого можно достичь так:
Pid ! {self(),{rectangle, 6, 10}}
self() - это PID клиентского процесса.
Процесс, который посылает начальный запрос называется клиентом. Процесс,
который принимает запрос и отправляет ответ называется сервером.
Мы добавили маленькую полезную функцию, названную rpc (сокращение для
remote procedure call — удалённый вызов процедуры), которая включает в себя
посылку запроса на сервер и ожидание ответа:
Между клиентом и сервером устанавливается диалог, в который не смогут вмешаться другие процессы:
-module(area_server1).
-compile([export_all]).
start() -> spawn(fun loop/0).
area(Pid, What) ->
rpc(Pid, What).
rpc(Pid, Request) ->
Pid ! {self(), Request},
receive
{Pid, Response} ->
Response
end.
loop() ->
receive
{From, {rectangle, Width, Ht}} ->
From ! {self(), Width * Ht},
io:format("Area of rectangle is ~p~n",[Width * Ht]),
loop();
{From, {circle, R}} ->
From ! {self(), 3.14159 * R * R},
io:format("Area of circle is ~p~n", [3.14159 * R * R]),
loop();
{From, Other} ->
From ! {self(), {error,Other}},
io:format("ups ...\n", []),
loop()
end.
main() ->
io:format("start area_server1\n",[]),
Pid = area_server1:start(),
area_server1:area(Pid, {rectangle, 10, 8}),
area_server1:area(Pid, {blablabla, 10, 8}),
area_server1:area(Pid, {circle, 4}).
Теперь напишем тестовую программу, которая определит, сколько вообще на вашей машине можно создавать
эрланговских процессов:
-module(processes).
-compile([export_all]).
max(N) ->
Max = erlang:system_info(process_limit),
io:format("Maximum allowed processes:~p~n",[Max]),
statistics(runtime),
statistics(wall_clock),
L = for(1, N, fun() -> spawn(fun() -> wait() end) end),
{_, Time1} = statistics(runtime),
{_, Time2} = statistics(wall_clock),
lists:foreach(fun(Pid) -> Pid ! die end, L),
U1 = Time1 * 1000 / N,
U2 = Time2 * 1000 / N,
io:format("Process spawn time=~p (~p) microseconds~n",
[U1, U2]).
wait() ->
receive
die -> void
end.
for(N, N, F) -> [F()];
for(I, N, F) -> [F()|for(I+1, N, F)].
main() ->
io:format("start ~n",[]),
max(20000),
max(10000),
io:format("finish ~n",[]).
Когда я запустил у себя эту программу, то получил следующее:
start
Maximum allowed processes:32768
Process spawn time=6.0 (7.5) microseconds
Maximum allowed processes:32768
Process spawn time=5.0 (6.0) microseconds
finish
На моей машинке создание 20000 процессов требует 6 микросекунд.
По умолчанию в эрланге установлен минимум в 32768 процессов.
Эрлангу можно сказать при запуске увеличить этот минимум, например:
erl +P 500000
В эрланге на стороне приема можно организовать таймер.
-module(stimer).
-compile([export_all]).
start(Time, Fun) -> spawn(fun() -> timer(Time, Fun) end).
cancel(Pid) -> Pid ! cancel.
timer(Time, Fun) ->
receive
cancel ->
void
after Time ->
Fun()
end.
main() ->
Pid = stimer:start(5000, fun() -> io:format("timer event~n") end),
io:format("Pid = ~w~n",[Pid]),
Pid1 = stimer:start(25000, fun() -> io:format("timer event~n") end),
io:format("Pid1 = ~w~n",[Pid1]),
stimer:cancel(Pid1).
Первый раз мы ждем пять секунд, чтобы сработал таймер.
Потом запускаем второй таймер и тут же отменяем его.
Каждый процесс в Эрланге имеет свой собственный почтовый ящик. Когда вы
посылаете сообщение процессу, это сообщение помещается в почтовый ящик.
Почтовый ящик проверяется только тогда, когда программа вычисляет оператор
receive:
receive
Pattern1 [when Guard1] ->
Expressions1;
Pattern2 [when Guard1] ->
Expressions1;
...
after Time ->
ExpressionTimeout
end.
receive работает следующим образом:
1. Когда мы входим в оператор receive , мы запускаем таймер (но только, если в
выражении присутствует секция after ).
2. Взять первое сообщение из почтового ящика и попытаться соотнести его с
образцами Pattern1 , Pattern2 и т.д. Если соответствие успешно, то сообщение
удаляется из почтового ящика и вычисляется выражение, следующее за
образцом.
3. Если ни один из образцов в операторе receive не соответствует первому
сообщению из почтового ящика, то первое сообщение удаляется из ящика и
помещается в «отложенную очередь» (save queue). Затем так же проверяется
второе сообщение. Эта процедура повторяется до тех пор, пока не будет найдено
совпадающее сообщение, либо не будут проверены все сообщения из почтового
ящика.
4. Если ни одно сообщение из почтового ящика не соответствует, процесс
приостанавливается и ждёт до тех пор, пока новое сообщение не будет помещено
в почтовый ящик. Заметьте, что когда новое сообщение прибывает, сообщения из
отложенной очереди не проверяются заново на соответствие образцам.
Проверяется только новое сообщение.
5. Как только сообщение совпало с образцом, сразу после этого все сообщения из
отложенной очереди помещаются обратно в почтовый ящик в том же порядке, в
каком они прибыли к процессу. Если был установлен таймер, то он очищается.
6. Если таймер истёк, пока мы ждали сообщение, то выполнится выражение
ExpressionTimeout, после чего все отложенные сообщения поместятся обратно в
почтовый ящик в том же порядке, в каком они прибыли к процессу.
У Эрланга есть метод публикации идентификатора процесса, так что любой процесс в
системе может общаться с этим процессом. Такой процесс называется
зарегистрированным процессом. Есть четыре встроенные функции (BIF) для
управления зарегистрированными процессами:
register(AnAtom, Pid)
- зарегистрировать процесс Pid с именем AnAtom .
unregister(AnAtom)
- удалить любые регистрации, связанные с AnAtom .
Следующий шаблон полезен для написания параллельных программ:
-module(ctemplate).
-compile(export_all).
start() ->
spawn(fun() -> loop([]) end).
rpc(Pid, Request) ->
Pid ! {self(), Request},
receive
{Pid, Response} ->
Response
end.
loop(X) ->
receive
Any ->
io:format("Received:~p~n",[Any]),
loop(X)
end.
Здесь цикл приёма — это просто пустой цикл, который принимает и печатает все сообщения,
В эрланге один процесс может проверить статус другого процесса - жив тот или нет.
Для этого есть встроенная функция Эрланга link, в которой параметром является
идентификатор процесса Pid.
После установления связи оба процесса неявно следят друг за другом. Если
умрёт процесс А , то процесс B получит сигнал выхода (exit signal) . И наоборот — если умрёт B ,
то такой сигнал получит A. Это работает как для одной локальной машины, так и для сетевых машин.
Можно выполнить некое действие, когда процесс завершается. Можно написать
функцию on_exit(Pid,Fun) , которая устанавливает связь с процессом Pid . Если
Pid умирает с причиной Why , то вычисляется функция Fun(Why):
on_exit(Pid, Fun) ->
spawn(fun() ->
process_flag(trap_exit,true),
link(Pid),
receive
{'EXIT', Pid, Why} ->
Fun(Why)
end
end).
Следующий пример создает процесс, устанавливаем обработчик on_exit для мониторинга этого процесса,
Сначала определим функцию F , которая ждёт единственное сообщение X и затем вычисляет list_to_atom(X),
затем создаем процесс, устанавливаем обработчик on_exit, затем посылаем этому процессу не список,
который он ожидает, а просто атом, в результате процесс умирает по ошибке, и тут же автоматически вызывается
on_exit.
-module(on_exit).
-compile([export_all]).
on_exit(Pid, Fun) ->
spawn(fun() ->
process_flag(trap_exit,true),
io:format(" I died with:~p~n ...",[Pid]),
link(Pid),
receive
{'EXIT', Pid, Why} ->
Fun(Why)
end
end).
main() ->
F = fun() -> receive X -> list_to_atom(X) end end,
Pid = spawn(F),
on_exit(Pid, fun(Why) -> io:format(" ~p died with:~p~n",[Pid, Why])end),
Pid ! hello.
Когда родительский процесс создает дочерний процесс, и дочерний процесс по какой-то причине падает,
для дальнейшего поведения родителя есть следующий 3 варианта:
1. Если родитель никак не хочет реагировать на падение дочернего процесса, то дочерний процес создается
с помощью команды
Pid = spawn(fun() -> ... end)
2. Если нужно, чтобы родитель умер сразу после того, как умрет дочерний процесс, то последний должен быть создан
следующей командой:
Pid = spawn_link(fun() -> ... end)
3. Если родитель должен обработать ошибки в случае падения дочернего процесса,
нужно использовать следующую схему:
process_flag(trap_exit, true),
Pid = spawn_link(fun() -> ... end),
...
loop(...).
loop(State) ->
receive
{'EXIT', SomePid, Reason} ->
%% do something with the error
loop(State1);
...
end.
Теперь процесс, вычисляющий loop , перехватывает выход и не умрёт, если упадёт
связанный с ним другой процесс. Он увидит все сигналы выхода (преобразованные в
сообщения) от умирающего процесса и сможет предпринять все необходимые
действия, когда обнаружит сбой.
Монитор — это однонаправленная связь. Если процесс A мониторит процесс B , и
процесс B умирает, то к А будет послан сигнал выхода. Однако, если А умирает, то к
B не будет послано никакого сигнала выхода. Полное описание возможностей
монитора можно найти в руководстве по Эрлангу.
Пример
Теперь пришло время написать простейшую систему клиент-сервер. Это будет сервер имен -
программа, которая, получив имя, возвращает значение, связанное с этим именем.
Мы также можем менять значение, связанное с определённым именем:
-module(kvs).
-compile([export_all]).
start() -> register(kvs, spawn(fun() -> loop() end)).
store(Key, Value) -> rpc({store, Key, Value}).
lookup(Key) -> rpc({lookup, Key}).
rpc(Q) ->
kvs ! {self(), Q},
receive
{kvs, Reply} ->
Reply
end.
loop() ->
receive
{From, {store, Key, Value}} ->
put(Key, {ok, Value}),
From ! {kvs, true},
loop();
{From, {lookup, Key}} ->
From ! {kvs, get(Key)},
loop()
end.
main() ->
io:format(" node:~p~n",[node()]).
start(),
store({location, joe}, "Stockholm"),
store(weather, raining),
io:format(" lookup:~p~n",[lookup(weather)]),
io:format(" lookup:~p~n",[lookup({location, joe})]),
io:format(" lookup:~p~n",[lookup({location, jane})]).
Его нужно запустить с помощью следующего батника - вместо serg вы можете поставить любое имя:
Имя узла имеет вид Name@Host. Name и Host — это атомы и если они содержат
какие-либо не атомные символы, то такие атомы должны быть в одинарных
кавычках.
erlc kvs.erl
erl -sname serg -noshell -s kvs main
После запуска сервера терминал выведет что-то типа:
node:serg@blablabla
Вам вот это вот имя ноды serg@blablabla понадобится для того, чтобы вставить
его в клиентский код.
Теперь напишем простого клиента, который будет делать запросы к этому серверу.
Не забудьте вставить имя ноды. Выглядит он так-
-module(kvc).
-compile([export_all]).
main() ->
io:format(" node:~p~n",[node()]),
io:format(" lookup:~p~n",[rpc:call(serg@blablabla, kvs, store, [weather, fine])]),
io:format(" lookup:~p~n",[rpc:call(serg@blablabla, kvs, lookup, [weather])]).
Для запуска клиента вам понадобится - внимание - второй терминал на этой же машине, чтобы полностью
смоделировать распределенную среду.
Теперь осталось запустить батник из командной строки, который соберет клиента и запустит его:
erlc kvc.erl
erl -sname serg2 -noshell -s kvc main -s init stop
Вывод клиента должен быть типа:
node:serg2@blablabla
lookup:true
lookup:{ok,fine}
Так на одной локальной машине можно тестировать распределенное эрланговское приложение.
Если клиент и сервер расположены на разных машинах в пределах одной локальной сети,
интерпретатор нужно запускать на них с одинаковыми куками:
erl -name serg -noshell -setcookie blablabla -s kvs main
erl -name serg2 -noshell -setcookie blablabla -s kvc main -s init stop
Параметр sname можно использовать, когда машины в одной подсети, параметр name - в разных подсетях.
У каждого узла куки должны быть одинаковы в целях безопасности.
Набор соединённых узлов, имеющих одинаковые куки, образует эрланговый кластер.
|