Goroutine - функция, которая может согласованно выполняться с другими функциями.
Для создания рутины используется ключевое слово go.
Channel - канал, по которому можно передавать данные, в частности между рутинами, и синхронизировать их работу.
Канал может иметь произвольный тип.
Каналы можно использовать где угодно: в качестве полей структур, параметров функций,
Каналы используются в том числе для синхронизации рутин.
В следующем примере создается один двунаправленный канал, который передается в качестве параметра.
Он организует (или синхронизирует) работу между тремя рутинами. Две рутины отсылают данные в канал, а третья принимает.
Оператор <- служит для отсылки данных в канал и приемки данных из канала.
Если этот оператор стоит слева от переменной, обозначающей канал, это значит, что канал принимает данные извне.
Если оператор стоит справа от переменной, это означает отсылку данных:
func sender(c chan int) {
for i := 0; i< 11; i++ {
c <- i
}
}
func sender2(c chan int) {
for i := 11; i< 20; i++ {
c <- i
}
}
func receiver(c chan int) {
for {
msg := <- c
fmt.Println(msg)
}
}
func main() {
var c chan int = make(chan int)
go sender(c)
go sender2(c)
go receiver(c)
var input string
fmt.Scanln(&input)
}
Канал можно сделать однонаправленным - для этого нужно изменить заголовок функции:
func receiver(c <- chan int) {
В первом примере в качестве рутины выступает именованная функция.
Анонимная функция также может выступать в качестве рутины.
В следующем примере мы используем второй вариант. Будут созданы два канала, два сендера и один получатель.
Отличие второго примера от первого в том, что получатель будет принимать данные не из одного, а из двух каналов.
Для этого используется выражение, аналогичное switch, но только для каналов:
select
func main() {
var c1 chan int = make(chan int)
var c2 chan int = make(chan int)
go func () {
for i := 0; i< 11; i++ {
c1 <- i
}
}()
go func () {
for i := 11; i< 20; i++ {
c2 <- i
}
}()
go func () {
for {
select {
case num1 := <- c1:
fmt.Println(num1)
case num2 := <- c2:
fmt.Println(num2)
}
}
}()
var input string
fmt.Scanln(&input)
}
В приведенных примерах использовались небуфферизованные каналы, их особенность в том, что это синхронные каналы, в том смысле,
что если мы попытаемся записать данные в уже непустой канал, операция записи заблокирует программу до тех пор,
пока данные оттуда не будут прочитаны. Эту особенность можно проиллюстрировать следующим примером - в нем создается небуферизованный
канал, делается попытка записать в него сразу три сообщения, что блокируется после первой же записи в канал до тех пор,
пока мы не начинаем читать из него:
message := make(chan string) // no buffer
count := 3
go func() {
for i := 1; i <= count; i++ {
fmt.Println("send message")
message <- fmt.Sprintf("message %d", i)
}
}()
time.Sleep(time.Second * 3)
for i := 1; i <= count; i++ {
fmt.Println(<-message)
}
Буфферизованные асинхронные каналы создаются с дополнительным параметром - capacity.
Если мы сделаем попытку записать в канал или прочитать из канала сообщений больше, чем его буфер, произойдет дедлок:
c := make(chan int, 2)
c <- 1
c <- 2
c <- 3
fmt.Println(<-c)
fmt.Println(<-c)
Чтобы этого не происходило, канал нужно закрывать командой close.
В следующем примере мы создаем канал с буфером=10, создаем рутину, в которой заполняем этот канал и закрываем канал,
а потом читаем из него. Буферизованный канал позволяет делать итерацию, но перед этим канал нужно закрывать.
После чего чтение из закрытого канала - неблокирующая операция:
func fibonacci(n int, c chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
close(c)
}
func main() {
c := make(chan int, 10)
go fibonacci(cap(c), c)
for i := range c {
fmt.Println(i)
}
}
Иногда возникают ситуация, когда канал по каким-то причинам задерживает отдачу.
В этом случае можно использовать timeout для ограничения времени отклика.
В следующем примере делается http-запрос. Создаются два канала - один для ответа и второй для ошибки.
С помощью оператора select обрабатывается возможные сценарии,
когда ответ на запрос из канала может вернуть ошибку либо задержаться:
response := make(chan *http.Response, 1)
errors := make(chan *error)
go func() {
resp, err := http.Get("http://iakovlev.org/")
if err != nil {
errors <- &err
}
response <- resp
}()
for {
select {
case r := <-response:
fmt.Printf("%s", r.Body)
return
case err := <-errors:
log.Fatal(err)
case <-time.After(100 * time.Millisecond):
fmt.Printf("Timed out!")
return
}
}
В гоу concurrency - это не параллелизм. Для обьяснения этого парадокса рассмотрим код, в котором не будет
ни одной рутины, в нем будут две анонимных функции, которые по всем законам жанра будут выполняться строго последовательно -
именно в том порядке, в котором они прописаны:
fmt.Println("Start")
func() {
time.Sleep(1000000 * time.Microsecond)
for char := 'a'; char < 'a'+26; char++ {
fmt.Printf("%c ", char)
}
}()
println()
func() {
for number := 1; number < 27; number++ {
fmt.Printf("%d ", number)
}
}()
println()
fmt.Println("End")
Вывод:
Start
a b c d e f g h i j k l m n o p q r s t u v w x y z
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
End
Вывод вполне предсказуем, при этом обратите внимание на тот факт, что во время паузы в первой анонимной функции
программа будет заблокирована на период ожидания.
Перепишем этот пример - анонимные функции сделаем рутинами с помощью go и добавим
синхронизацию:
var wg sync.WaitGroup
wg.Add(2)
fmt.Println("Start")
go func() {
defer wg.Done()
time.Sleep(1000000 * time.Microsecond)
for char := 'a'; char < 'a'+26; char++ {
fmt.Printf("%c ", char)
}
}()
go func() {
defer wg.Done()
for number := 1; number < 27; number++ {
fmt.Printf("%d ", number)
}
println()
}()
wg.Wait()
println()
fmt.Println("End")
Вывод:
Start
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
a b c d e f g h i j k l m n o p q r s t u v w x y z
End
Порядок работы и вывод существенно меняются - сначала выполнится вторая рутина, а потом первая, т.е. разница в том,
что теперь первая рутина не блокирует ход программы, у нас есть возможность автоматически переключиться между рутинами
в том случае, если одна занята ожиданием. Т.е. concurrency дает возможность использовать ресурсы на сто процентов.
Когда мы говорим, что concurrency - это не параллелизм, мы имеем ввиду тот факт, что в гоу по умолчанию программа выполняется
одним процессором. Если мы ходим использовать больше одного ядра, т.е. сделать программу параллельной,
мы должны сделать обьявление - runtime.GOMAXPROCS(2):
runtime.GOMAXPROCS(2)
var wg sync.WaitGroup
wg.Add(2)
fmt.Println("Start")
go func() {
defer wg.Done()
for char := 'a'; char < 'a'+26; char++ {
fmt.Printf("%c ", char)
}
}()
go func() {
defer wg.Done()
for number := 1; number < 27; number++ {
fmt.Printf("%d ", number)
}
println()
}()
wg.Wait()
println()
fmt.Println("End")
Вывод:
Start
1 2 3 4 5 a 6 b c d e f g h i j k l m n o p 7 q r 8 s t u v w x y z 9 10 ...
End
В данном случае это будет уже программа с двумя рутинами, выполняемыми параллельно на разных ядрах,
и каждый раз будет генерироваться непредсказуемый вывод.
Race condition - ситуация, когда многопоточная программа дает непредсказуемый результат.
Такой результат зачастую невозможно воспроизвести.
Одна из разновидностей - data race - случается тогда, когда две рутины получают одновременный доступ на запись
к одной и той же переменной. Есть три варианта для обхода data race:
1. Сделать доступ этой переменной только на чтение - правда, в большинстве случаев это не подходит
2. Сделать доступ на запись в переменную только для одной рутины
3. Можно использовать мьютекс, у которого есть методы блокирования и разблокирования:
mu sync.Mutex
mu.Lock()
...
mu.Unlock()
Область кода между Lock() и Unlock() называется критической секцией.
Функции с такими секциями называются мониторами.
Иногда возникает ситуация, когда вызов Unlock() полезно ложить в отложенную функцию defer, чтобы быть уверенным
в гарантированной разблокировке - в следующем примере разблокировка произойдет уже после того, как функция вернет
значение - поэтому это concurrency-safe функция, даже если в функции случится паника и там будет стоять вызов рекавери,
разблокировка все равно сработает :
func Balance() int {
mu.Lock()
defer mu.Unlock()
return balance
}
Простая ситуация, когда возможен дедлок: есть две функции, одна вызывается внутри другой,
обе функции используют один и тот же мьютекс для блокировки, при вызове вложенной функции и произойдет дедлок,
потому что у мьютекса нельзя вызывать два раза подряд один и тот же метод
mu.Lock()
Есть специальный тип мьютекса с эксклюзивным доступом на запись, но он работает медленнее обычного:
var mu sync.RWMutex
В пакете sync есть еще один обьект
sync.Once
Он включает в себя одновременно мьютекс и булевский флаг. У него есть метод
once.Do(oblect)
При первом вызове этого метода флаг установится в true, при дальнейших вызовах чтение расшаренного в памяти обьекта будет
оптимизировано.
В майнфреймовых языках считается правилом хорошего тона использование блокировок с помощью мьютексов.
Гоу также предоставляет аналогичную возможность. Кроме этого, в гоу есть другая возможность для синхронизации -
для этого можно использовать каналы.
В следующем примере показано, как работать с "глобальным" словарем, при этом мьютексы не используются.
В словаре создается два счетчика, затем они наращиваются, после чего выводятся на экран.
На самом деле словарь здесь не глобален, а локален внутри одной функции, но виден он отовсюду.
Канал, по которому происходит обмен, предоставляет доступ к этому словарю из любой точки программы.
Этот небуферизованный - синхронный - канал является одновременно синронизатором доступа к словарю:
package main
import "sync"
type request struct {
key int
value int
op string
ret chan int
}
func set(c chan request, key int) {
c <- request{key, 0, "set", nil}
}
func get(c chan request, key int) int {
result := make(chan int)
c <- request{key, 0, "get", result}
return <-result
}
func add(c chan request, key int) {
c <- request{key, 0, "add", nil}
}
func runMap(c chan request) {
m := make(map[int] int)
for {
req := <- c
switch req.op {
case "set":
m[req.key] = 0
case "get":
req.ret <- m[req.key]
case "add":
m[req.key] += 1
}
}
}
func main() {
m := make(chan request)
go runMap(m)
set(m, 1 )
set(m, 2)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i< 100000; i++ {
add(m, 1)
}
}()
go func() {
defer wg.Done()
for i := 0; i< 200000; i++ {
add(m, 2)
}
}()
wg.Wait()
println(get(m, 1))
println(get(m, 2))
}