Search     or:     and:
 LINUX 
 Language 
 Kernel 
 Package 
 Book 
 Test 
 OS 
 Forum 
 iakovlev.org 
 Languages
 С
 GNU С Library 
 Qt 
 STL 
 Threads 
 C++ 
 Samples 
 stanford.edu 
 ANSI C
 Libs
 LD
 Socket
 Pusher
 Pipes
 Encryption
 Plugin
 Inter-Process
 Errors
 Deep C Secrets
 C + UNIX
 Linked Lists / Trees
 Asm
 Perl
 Python
 Shell
 Erlang
 Go
 Rust
 Алгоритмы
NEWS
Последние статьи :
  Тренажёр 16.01   
  Эльбрус 05.12   
  Алгоритмы 12.04   
  Rust 07.11   
  Go 25.12   
  EXT4 10.11   
  FS benchmark 15.09   
  Сетунь 23.07   
  Trees 25.06   
  Apache 03.02   
 
TOP 20
 Linux Kernel 2.6...5170 
 Trees...938 
 Максвелл 3...870 
 Go Web ...823 
 William Gropp...802 
 Ethreal 3...787 
 Gary V.Vaughan-> Libtool...772 
 Ethreal 4...770 
 Rodriguez 6...763 
 Ext4 FS...755 
 Steve Pate 1...754 
 Clickhouse...753 
 Ethreal 1...742 
 Secure Programming for Li...731 
 C++ Patterns 3...716 
 Ulrich Drepper...696 
 Assembler...694 
 DevFS...660 
 Стивенс 9...649 
 MySQL & PosgreSQL...631 
 
  01.01.2024 : 3621733 посещений 

iakovlev.org

Concurrency

Goroutine - функция, которая может согласованно выполняться с другими функциями. Для создания рутины используется ключевое слово go.
Channel - канал, по которому можно передавать данные, в частности между рутинами, и синхронизировать их работу. Канал может иметь произвольный тип. Каналы можно использовать где угодно: в качестве полей структур, параметров функций, Каналы используются в том числе для синхронизации рутин.
В следующем примере создается один двунаправленный канал, который передается в качестве параметра. Он организует (или синхронизирует) работу между тремя рутинами. Две рутины отсылают данные в канал, а третья принимает. Оператор <- служит для отсылки данных в канал и приемки данных из канала. Если этот оператор стоит слева от переменной, обозначающей канал, это значит, что канал принимает данные извне. Если оператор стоит справа от переменной, это означает отсылку данных:

 func sender(c chan int) {
   for i := 0; i< 11; i++ {
     c <- i
   }
 }
 
 func sender2(c chan int) {
   for i := 11; i< 20; i++ {
     c <- i
   }
 }
 
 func receiver(c chan int) {	
   for {
     msg := <- c
     fmt.Println(msg)
   }
 }
 
 func main() {
   var c chan int = make(chan int)
 
   go sender(c)
   go sender2(c)
   go receiver(c)
 
   var input string
   fmt.Scanln(&input)
 }
Канал можно сделать однонаправленным - для этого нужно изменить заголовок функции:
func receiver(c <- chan int) {
В первом примере в качестве рутины выступает именованная функция. Анонимная функция также может выступать в качестве рутины. В следующем примере мы используем второй вариант. Будут созданы два канала, два сендера и один получатель. Отличие второго примера от первого в том, что получатель будет принимать данные не из одного, а из двух каналов. Для этого используется выражение, аналогичное switch, но только для каналов:
select

 func main() {
   var c1 chan int = make(chan int)
   var c2 chan int = make(chan int)
 
   go func () {
     for i := 0; i< 11; i++ {
       c1 <- i
     }
   }()
   
   go func () {
     for i := 11; i< 20; i++ {
       c2 <- i
     }
   }()
   
   go func () {
     for {
     select {
       case num1 := <- c1:
 	fmt.Println(num1)
       case num2 := <- c2:
 	fmt.Println(num2)
      }
     }
   }()
   
 
   var input string
   fmt.Scanln(&input)
 }
В приведенных примерах использовались небуфферизованные каналы, их особенность в том, что это синхронные каналы, в том смысле, что если мы попытаемся записать данные в уже непустой канал, операция записи заблокирует программу до тех пор, пока данные оттуда не будут прочитаны. Эту особенность можно проиллюстрировать следующим примером - в нем создается небуферизованный канал, делается попытка записать в него сразу три сообщения, что блокируется после первой же записи в канал до тех пор, пока мы не начинаем читать из него:

      message := make(chan string) // no buffer
      count := 3
      go func() {
           for i := 1; i <= count; i++ {
                fmt.Println("send message")
                message <- fmt.Sprintf("message %d", i)
           }
      }()
      time.Sleep(time.Second * 3)
      for i := 1; i <= count; i++ {
           fmt.Println(<-message)
      }
Буфферизованные асинхронные каналы создаются с дополнительным параметром - capacity.
Если мы сделаем попытку записать в канал или прочитать из канала сообщений больше, чем его буфер, произойдет дедлок:

     c := make(chan int, 2)
     c <- 1
     c <- 2
     c <- 3
     fmt.Println(<-c)
     fmt.Println(<-c)
 
Чтобы этого не происходило, канал нужно закрывать командой close. В следующем примере мы создаем канал с буфером=10, создаем рутину, в которой заполняем этот канал и закрываем канал, а потом читаем из него. Буферизованный канал позволяет делать итерацию, но перед этим канал нужно закрывать. После чего чтение из закрытого канала - неблокирующая операция:

 func fibonacci(n int, c chan int) {
     x, y := 0, 1
     for i := 0; i < n; i++ {
         c <- x
         x, y = y, x+y
     }
     close(c)
 }
 
 func main() {
     c := make(chan int, 10)
     go fibonacci(cap(c), c)
     for i := range c {
         fmt.Println(i)
     }
 }
 
Иногда возникают ситуация, когда канал по каким-то причинам задерживает отдачу. В этом случае можно использовать timeout для ограничения времени отклика. В следующем примере делается http-запрос. Создаются два канала - один для ответа и второй для ошибки. С помощью оператора select обрабатывается возможные сценарии, когда ответ на запрос из канала может вернуть ошибку либо задержаться:

 	response := make(chan *http.Response, 1)
 	errors   := make(chan *error)
 
 	go func() {
 		resp, err := http.Get("http://iakovlev.org/")
 		if err != nil {
 			errors <- &err
 		}
 		response <- resp
 	}()
 	for {
 		select {
 		case r := <-response:
 			fmt.Printf("%s", r.Body)
 			return
 		case err := <-errors:
 			log.Fatal(err)
 		case <-time.After(100 * time.Millisecond):
 			fmt.Printf("Timed out!")
 			return
 		}
 	}    
 
В гоу concurrency - это не параллелизм. Для обьяснения этого парадокса рассмотрим код, в котором не будет ни одной рутины, в нем будут две анонимных функции, которые по всем законам жанра будут выполняться строго последовательно - именно в том порядке, в котором они прописаны:

     fmt.Println("Start")
     func() {
 	time.Sleep(1000000 * time.Microsecond)
         for char := 'a'; char < 'a'+26; char++ {
             fmt.Printf("%c ", char)
         }
     }()
     println()
     func() {
 
         for number := 1; number < 27; number++ {
             fmt.Printf("%d ", number)
         }
     }()
     println()
     fmt.Println("End")
 
 Вывод:
 Start
 a b c d e f g h i j k l m n o p q r s t u v w x y z 
 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
 End
 
Вывод вполне предсказуем, при этом обратите внимание на тот факт, что во время паузы в первой анонимной функции программа будет заблокирована на период ожидания.
Перепишем этот пример - анонимные функции сделаем рутинами с помощью go и добавим синхронизацию:

     var wg sync.WaitGroup
     wg.Add(2)
     fmt.Println("Start")
     go func() {
 	defer wg.Done()
 	time.Sleep(1000000 * time.Microsecond)
         for char := 'a'; char < 'a'+26; char++ {
             fmt.Printf("%c ", char)
         }
     }()
     go func() {
 	defer wg.Done()
         for number := 1; number < 27; number++ {
             fmt.Printf("%d ", number)
         }
 	println()
     }()
     wg.Wait()
     println()
     fmt.Println("End")
 Вывод:
 Start
 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
 a b c d e f g h i j k l m n o p q r s t u v w x y z 
 End
 
Порядок работы и вывод существенно меняются - сначала выполнится вторая рутина, а потом первая, т.е. разница в том, что теперь первая рутина не блокирует ход программы, у нас есть возможность автоматически переключиться между рутинами в том случае, если одна занята ожиданием. Т.е. concurrency дает возможность использовать ресурсы на сто процентов.
Когда мы говорим, что concurrency - это не параллелизм, мы имеем ввиду тот факт, что в гоу по умолчанию программа выполняется одним процессором. Если мы ходим использовать больше одного ядра, т.е. сделать программу параллельной, мы должны сделать обьявление - runtime.GOMAXPROCS(2):

     runtime.GOMAXPROCS(2)
     var wg sync.WaitGroup
     wg.Add(2)
     fmt.Println("Start")
     go func() {
 	defer wg.Done()
         for char := 'a'; char < 'a'+26; char++ {
             fmt.Printf("%c ", char)
         }
     }()
     go func() {
 	defer wg.Done()
         for number := 1; number < 27; number++ {
             fmt.Printf("%d ", number)
         }
 	println()
     }()
     wg.Wait()
     println()
     fmt.Println("End")
 Вывод:
 Start
 1 2 3 4 5 a 6 b c d e f g h i j k l m n o p 7 q r 8 s t u v w x y z 9 10 ... 
 End
 
В данном случае это будет уже программа с двумя рутинами, выполняемыми параллельно на разных ядрах, и каждый раз будет генерироваться непредсказуемый вывод.

Race condition - ситуация, когда многопоточная программа дает непредсказуемый результат.
Такой результат зачастую невозможно воспроизвести.
Одна из разновидностей - data race - случается тогда, когда две рутины получают одновременный доступ на запись к одной и той же переменной. Есть три варианта для обхода data race:
1. Сделать доступ этой переменной только на чтение - правда, в большинстве случаев это не подходит
2. Сделать доступ на запись в переменную только для одной рутины
3. Можно использовать мьютекс, у которого есть методы блокирования и разблокирования:

     mu sync.Mutex 
     mu.Lock()
     ...
     mu.Unlock()
 
Область кода между Lock() и Unlock() называется критической секцией. Функции с такими секциями называются мониторами.
Иногда возникает ситуация, когда вызов Unlock() полезно ложить в отложенную функцию defer, чтобы быть уверенным в гарантированной разблокировке - в следующем примере разблокировка произойдет уже после того, как функция вернет значение - поэтому это concurrency-safe функция, даже если в функции случится паника и там будет стоять вызов рекавери, разблокировка все равно сработает :

 func Balance() int {
   mu.Lock()
   defer mu.Unlock()
   return balance
 }
 
 
Простая ситуация, когда возможен дедлок: есть две функции, одна вызывается внутри другой, обе функции используют один и тот же мьютекс для блокировки, при вызове вложенной функции и произойдет дедлок, потому что у мьютекса нельзя вызывать два раза подряд один и тот же метод
mu.Lock()
Есть специальный тип мьютекса с эксклюзивным доступом на запись, но он работает медленнее обычного:
var mu sync.RWMutex
В пакете sync есть еще один обьект
sync.Once
Он включает в себя одновременно мьютекс и булевский флаг. У него есть метод
once.Do(oblect)
При первом вызове этого метода флаг установится в true, при дальнейших вызовах чтение расшаренного в памяти обьекта будет оптимизировано.

В майнфреймовых языках считается правилом хорошего тона использование блокировок с помощью мьютексов. Гоу также предоставляет аналогичную возможность. Кроме этого, в гоу есть другая возможность для синхронизации - для этого можно использовать каналы.
В следующем примере показано, как работать с "глобальным" словарем, при этом мьютексы не используются. В словаре создается два счетчика, затем они наращиваются, после чего выводятся на экран. На самом деле словарь здесь не глобален, а локален внутри одной функции, но виден он отовсюду. Канал, по которому происходит обмен, предоставляет доступ к этому словарю из любой точки программы. Этот небуферизованный - синхронный - канал является одновременно синронизатором доступа к словарю:

 package main
 
 import "sync"
 
 type request struct {
 	key int
 	value int
 	op string
 	ret chan int
 } 
 
 func set(c chan request, key int)  {
 	c <- request{key, 0, "set", nil}
 }
 
 func get(c chan request, key int) int {
 	result := make(chan int)
 	c <- request{key, 0, "get", result}
 	return <-result
 	
 }
 
 func add(c chan request, key int)  {
 	c <- request{key, 0, "add", nil}
 }
 
 func runMap(c chan request) {
 	m := make(map[int] int)
 	for {
 		req := <- c
 		switch req.op {
 		  case "set":
 		    m[req.key] = 0
 		  case "get":
 		    req.ret <- m[req.key]
 		  case "add":
 		    m[req.key] += 1
 		}
 	}
 }
 
 func main() {
 	m := make(chan request)
 	
 	go runMap(m)
 	set(m, 1 )
 	set(m, 2)
 	
 	var wg sync.WaitGroup
 	wg.Add(2)
 	go func() {
 	  defer wg.Done()	  
 	  for i := 0; i< 100000; i++ {
 	    add(m, 1)
 	  }
 	}()
 	
 	go func() {
 	  defer wg.Done()	  
 	  for i := 0; i< 200000; i++ {
 	    add(m, 2)
 	  }
 	}()
 	wg.Wait()
 
  
 	println(get(m, 1))
 	println(get(m, 2))
 	
 	  
 }
 



Оставьте свой комментарий !

Ваше имя:
Комментарий:
Оба поля являются обязательными

 Автор  Комментарий к данной статье