sumSquares
function call are requested concurrently. Each of the two channel receive operations will block until a send operation performs on the corresponding channel. It takes about three seconds instead of six seconds to return the final result.
package main
import (
"time"
"math/rand"
"fmt"
)
func longTimeRequest() <-chan int32 {
r := make(chan int32)
go func() {
// Simulate a workload.
time.Sleep(time.Second * 3)
r <- rand.Int31n(100)
}()
return r
}
func sumSquares(a, b int32) int32 {
return a*a + b*b
}
func main() {
rand.Seed(time.Now().UnixNano()) // needed before Go 1.20
a, b := longTimeRequest(), longTimeRequest()
fmt.Println(sumSquares(<-a, <-b))
}
sumSquares
function call are requested concurrently. Different to the last example, the longTimeRequest
function takes a send-only channel as parameter instead of returning a receive-only channel result.
package main
import (
"time"
"math/rand"
"fmt"
)
func longTimeRequest(r chan<- int32) {
// Simulate a workload.
time.Sleep(time.Second * 3)
r <- rand.Int31n(100)
}
func sumSquares(a, b int32) int32 {
return a*a + b*b
}
func main() {
rand.Seed(time.Now().UnixNano()) // needed before Go 1.20
ra, rb := make(chan int32), make(chan int32)
go longTimeRequest(ra)
go longTimeRequest(rb)
fmt.Println(sumSquares(<-ra, <-rb))
}
...
// The channel can be buffered or not.
results := make(chan int32, 2)
go longTimeRequest(results)
go longTimeRequest(results)
fmt.Println(sumSquares(<-results, <-results))
}
package main
import (
"fmt"
"time"
"math/rand"
)
func source(c chan<- int32) {
ra, rb := rand.Int31(), rand.Intn(3) + 1
// Sleep 1s/2s/3s.
time.Sleep(time.Duration(rb) * time.Second)
c <- ra
}
func main() {
rand.Seed(time.Now().UnixNano()) // needed before Go 1.20
startTime := time.Now()
// c must be a buffered channel.
c := make(chan int32, 5)
for i := 0; i < cap(c); i++ {
go source(c)
}
// Only the first response will be used.
rnd := <- c
fmt.Println(time.Since(startTime))
fmt.Println(rnd)
}
struct{v T; err error}
or a blank interface type as the channel element type.
struct{}
as the element types of the notification channels, for the size of type struct{}
is zero, hence values of struct{}
doesn't consume memory.
done
is used as a signal channel to do notifications.
package main
import (
"crypto/rand"
"fmt"
"os"
"sort"
)
func main() {
values := make([]byte, 32 * 1024 * 1024)
if _, err := rand.Read(values); err != nil {
fmt.Println(err)
os.Exit(1)
}
done := make(chan struct{}) // can be buffered or not
// The sorting goroutine
go func() {
sort.Slice(values, func(i, j int) bool {
return values[i] < values[j]
})
// Notify sorting is done.
done <- struct{}{}
}()
// do some other things ...
<- done // waiting here for notification
fmt.Println(values[0], values[len(values)-1])
}
package main
import (
"fmt"
"time"
)
func main() {
done := make(chan struct{})
// The capacity of the signal channel can
// also be one. If this is true, then a
// value must be sent to the channel before
// creating the following goroutine.
go func() {
fmt.Print("Hello")
// Simulate a workload.
time.Sleep(time.Second * 2)
// Receive a value from the done
// channel, to unblock the second
// send in main goroutine.
<- done
}()
// Blocked here, wait for a notification.
done <- struct{}{}
fmt.Println(" world!")
}
package main
import "log"
import "time"
type T = struct{}
func worker(id int, ready <-chan T, done chan<- T) {
<-ready // block here and wait a notification
log.Print("Worker#", id, " starts.")
// Simulate a workload.
time.Sleep(time.Second * time.Duration(id+1))
log.Print("Worker#", id, " job done.")
// Notify the main goroutine (N-to-1),
done <- T{}
}
func main() {
log.SetFlags(0)
ready, done := make(chan T), make(chan T)
go worker(0, ready, done)
go worker(1, ready, done)
go worker(2, ready, done)
// Simulate an initialization phase.
time.Sleep(time.Second * 3 / 2)
// 1-to-N notifications.
ready <- T{}; ready <- T{}; ready <- T{}
// Being N-to-1 notified.
<-done; <-done; <-done
}
sync.WaitGroup
to do N-to-1 notifications, and we do 1-to-N notifications by close channels. Please read the next sub-section for details.
ready <- struct{}{}
in the last example with one channel close operation close(ready)
to do an 1-to-N notifications.
...
close(ready) // broadcast notifications
...
context
package uses this feature to confirm cancellations.
package main
import (
"fmt"
"time"
)
func AfterDuration(d time.Duration) <- chan struct{} {
c := make(chan struct{}, 1)
go func() {
time.Sleep(d)
c <- struct{}{}
}()
return c
}
func main() {
fmt.Println("Hi!")
<- AfterDuration(time.Second)
fmt.Println("Hello!")
<- AfterDuration(time.Second)
fmt.Println("Bye!")
}
After
function in the time
standard package provides the same functionality, with a much more efficient implementation. We should use that function instead to make the code look clean.
<-time.After(aDuration)
will make the current goroutine enter blocking state, but a time.Sleep(aDuration)
function call will not.
<-time.After(aDuration)
is often used in the timeout mechanism which will be introduced below.
sync
standard package.
package main
import "fmt"
func main() {
// The capacity must be one.
mutex := make(chan struct{}, 1)
counter := 0
increase := func() {
mutex <- struct{}{} // lock
counter++
<-mutex // unlock
}
increase1000 := func(done chan<- struct{}) {
for i := 0; i < 1000; i++ {
increase()
}
done <- struct{}{}
}
done := make(chan struct{})
go increase1000(done)
go increase1000(done)
<-done; <-done
fmt.Println(counter) // 2000
}
...
func main() {
mutex := make(chan struct{}, 1)
mutex <- struct{}{} // this line is needed.
counter := 0
increase := func() {
<-mutex // lock
counter++
mutex <- struct{}{} // unlock
}
...
N
, then it can be viewed as a lock which can have most N
owners at any time. Binary semaphores (mutexes) are special counting semaphores, each of binary semaphores can have at most one owner at any time.
package main
import (
"log"
"time"
"math/rand"
)
type Seat int
type Bar chan Seat
func (bar Bar) ServeCustomer(c int) {
log.Print("customer#", c, " enters the bar")
seat := <- bar // need a seat to drink
log.Print("++ customer#", c, " drinks at seat#", seat)
time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
log.Print("-- customer#", c, " frees seat#", seat)
bar <- seat // free seat and leave the bar
}
func main() {
rand.Seed(time.Now().UnixNano()) // needed before Go 1.20
// the bar has 10 seats.
bar24x7 := make(Bar, 10)
// Place seats in an bar.
for seatId := 0; seatId < cap(bar24x7); seatId++ {
// None of the sends will block.
bar24x7 <- Seat(seatId)
}
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
go bar24x7.ServeCustomer(customerId)
}
// sleeping != blocking
for {time.Sleep(time.Second)}
}
for
loop in the main
function is to avoid the program exiting. There is a better way, which will be introduced below, to do the job.
... // same code as the above example
func (bar Bar) ServeCustomerAtSeat(c int, seat Seat) {
log.Print("++ customer#", c, " drinks at seat#", seat)
time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
log.Print("-- customer#", c, " frees seat#", seat)
bar <- seat // free seat and leave the bar
}
func main() {
rand.Seed(time.Now().UnixNano()) // needed before Go 1.20
bar24x7 := make(Bar, 10)
for seatId := 0; seatId < cap(bar24x7); seatId++ {
bar24x7 <- Seat(seatId)
}
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
// Need a seat to serve next customer.
seat := <- bar24x7
go bar24x7.ServeCustomerAtSeat(customerId, seat)
}
for {time.Sleep(time.Second)}
}
... // same code as the above example
func (bar Bar) ServeCustomerAtSeat(consumers chan int) {
for c := range consumers {
seatId := <- bar
log.Print("++ customer#", c, " drinks at seat#", seatId)
time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
log.Print("-- customer#", c, " frees seat#", seatId)
bar <- seatId // free seat and leave the bar
}
}
func main() {
rand.Seed(time.Now().UnixNano()) // needed before Go 1.20
bar24x7 := make(Bar, 10)
for seatId := 0; seatId < cap(bar24x7); seatId++ {
bar24x7 <- Seat(seatId)
}
consumers := make(chan int)
for i := 0; i < cap(bar24x7); i++ {
go bar24x7.ServeCustomerAtSeat(consumers)
}
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
consumers <- customerId
}
}
bar24x7
semaphore is not essential at all:
... // same code as the above example
func ServeCustomer(consumers chan int) {
for c := range consumers {
log.Print("++ customer#", c, " drinks at the bar")
time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
log.Print("-- customer#", c, " leaves the bar")
}
}
func main() {
rand.Seed(time.Now().UnixNano()) // needed before Go 1.20
const BarSeatCount = 10
consumers := make(chan int)
for i := 0; i < BarSeatCount; i++ {
go ServeCustomer(consumers)
}
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
consumers <- customerId
}
}
package main
import (
"log"
"time"
"math/rand"
)
type Customer struct{id int}
type Bar chan Customer
func (bar Bar) ServeCustomer(c Customer) {
log.Print("++ customer#", c.id, " starts drinking")
time.Sleep(time.Second * time.Duration(3 + rand.Intn(16)))
log.Print("-- customer#", c.id, " leaves the bar")
<- bar // leaves the bar and save a space
}
func main() {
rand.Seed(time.Now().UnixNano()) // needed before Go 1.20
// The bar can serve most 10 customers
// at the same time.
bar24x7 := make(Bar, 10)
for customerId := 0; ; customerId++ {
time.Sleep(time.Second * 2)
customer := Customer{customerId}
// Wait to enter the bar.
bar24x7 <- customer
go bar24x7.ServeCustomer(customer)
}
for {time.Sleep(time.Second)}
}
package main
import "fmt"
import "time"
import "os"
type Ball uint64
func Play(playerName string, table chan Ball) {
var lastValue Ball = 1
for {
ball := <- table // get the ball
fmt.Println(playerName, ball)
ball += lastValue
if ball < lastValue { // overflow
os.Exit(0)
}
lastValue = ball
table <- ball // bat back the ball
time.Sleep(time.Second)
}
}
func main() {
table := make(chan Ball)
go func() {
table <- 1 // throw ball on table
}()
go Play("A:", table)
Play("B:", table)
}
chan chan<- int
is a channel type which element type is a send-only channel type chan<- int
.
package main
import "fmt"
var counter = func (n int) chan<- chan<- int {
requests := make(chan chan<- int)
go func() {
for request := range requests {
if request == nil {
n++ // increase
} else {
request <- n // take out
}
}
}()
// Implicitly converted to chan<- (chan<- int)
return requests
}(0)
func main() {
increase1000 := func(done chan<- struct{}) {
for i := 0; i < 1000; i++ {
counter <- nil
}
done <- struct{}{}
}
done := make(chan struct{})
go increase1000(done)
go increase1000(done)
<-done; <-done
request := make(chan int, 1)
counter <- request
fmt.Println(<-request) // 2000
}
len
and cap
to check the length and capacity of a channel. However, we seldom do this in practice. The reason for we seldom use the len
function to check the length of a channel is the length of the channel may have changed after the len
function call returns. The reason for we seldom use the cap
function to check the capacity of a channel is the capacity of the channel is often known or not important.
c
which no ones will send values to any more, then we can use the following code to receive remaining values.
// Assume the current goroutine is the only
// goroutine tries to receive values from
// the channel c at present.
for len(c) > 0 {
value := <-c
// use value ...
}
c
until it is full without entering blocking state at the end, and the goroutine is the only sender of the channel, then we can use the following code to do this job.
for len(c) < cap(c) {
c <- aValue
}
select{}
to block the current goroutine for ever. This is the simplest use case of the select mechanism. In fact, some uses of for {time.Sleep(time.Second)}
in some above examples can be replaced with select{}
.
select{}
is used to prevent the main goroutine from exiting, for if the main goroutine exits, the whole program will also exit.
package main
import "runtime"
func DoSomething() {
for {
// do something ...
runtime.Gosched() // avoid being greedy
}
}
func main() {
go DoSomething()
go DoSomething()
select{}
}
select{}
way is the simplest one.
select
block with one default
branch and only one case
branch is called a try-send or try-receive channel operation, depending on whether the channel operation following the case
keyword is a channel send or receive operation.
case
keyword is a send operation, then the select
block is called as try-send operation. If the send operation would block, then the default
branch will get executed (fail to send), otherwise, the send succeeds and the only case
branch will get executed.
case
keyword is a receive operation, then the select
block is called as try-receive operation. If the receive operation would block, then the default
branch will get executed (fail to receive), otherwise, the receive succeeds and the only case
branch will get executed.
package main
import "fmt"
func main() {
type Book struct{id int}
bookshelf := make(chan Book, 3)
for i := 0; i < cap(bookshelf) * 2; i++ {
select {
case bookshelf <- Book{id: i}:
fmt.Println("succeeded to put book", i)
default:
fmt.Println("failed to put book")
}
}
for i := 0; i < cap(bookshelf) * 2; i++ {
select {
case book := <-bookshelf:
fmt.Println("succeeded to get book", book.id)
default:
fmt.Println("failed to get book")
}
}
}
succeed to put book 0 succeed to put book 1 succeed to put book 2 failed to put book failed to put book failed to put book succeed to get book 0 succeed to get book 1 succeed to get book 2 failed to get book failed to get book failed to get book
T
the element type of the corresponding channel type.
func IsClosed(c chan T) bool {
select {
case <-c:
return true
default:
}
return false
}
...
// Can serve most 10 customers at the same time
bar24x7 := make(Bar, 10)
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
customer := Consumer{customerId}
select {
case bar24x7 <- customer: // try to enter the bar
go bar24x7.ServeConsumer(customer)
default:
log.Print("customer#", customerId, " goes elsewhere")
}
}
...
package main
import (
"fmt"
"math/rand"
"time"
)
func source(c chan<- int32) {
ra, rb := rand.Int31(), rand.Intn(3)+1
// Sleep 1s, 2s or 3s.
time.Sleep(time.Duration(rb) * time.Second)
select {
case c <- ra:
default:
}
}
func main() {
rand.Seed(time.Now().UnixNano()) // needed before Go 1.20
// The capacity should be at least 1.
c := make(chan int32, 1)
for i := 0; i < 5; i++ {
go source(c)
}
rnd := <-c // only the first response is used
fmt.Println(rnd)
}
select
code block to receive the source responses at the same time. For example,
package main
import (
"fmt"
"math/rand"
"time"
)
func source() <-chan int32 {
// c must be a buffered channel.
c := make(chan int32, 1)
go func() {
ra, rb := rand.Int31(), rand.Intn(3)+1
time.Sleep(time.Duration(rb) * time.Second)
c <- ra
}()
return c
}
func main() {
rand.Seed(time.Now().UnixNano()) // needed before Go 1.20
var rnd int32
// Blocking here until one source responses.
select{
case rnd = <-source():
case rnd = <-source():
case rnd = <-source():
}
fmt.Println(rnd)
}
select
code block is executed. This is a memory leak case.
func requestWithTimeout(timeout time.Duration) (int, error) {
c := make(chan int)
// May need a long time to get the response.
go doRequest(c)
select {
case data := <-c:
return data, nil
case <-time.After(timeout):
return 0, errors.New("timeout")
}
}
package main
import "fmt"
import "time"
func Tick(d time.Duration) <-chan struct{} {
// The capacity of c is best set as one.
c := make(chan struct{}, 1)
go func() {
for {
time.Sleep(d)
select {
case c <- struct{}{}:
default:
}
}
}()
return c
}
func main() {
t := time.Now()
for range Tick(time.Second) {
fmt.Println(time.Since(t))
}
}
Tick
function in the time
standard package provides the same functionality, with a much more efficient implementation. We should use that function instead to make code look clean and run efficiently.
package main
import "fmt"
import "time"
type Request interface{}
func handle(r Request) {fmt.Println(r.(int))}
const RateLimitPeriod = time.Minute
const RateLimit = 200 // most 200 requests in one minute
func handleRequests(requests <-chan Request) {
quotas := make(chan time.Time, RateLimit)
go func() {
tick := time.NewTicker(RateLimitPeriod / RateLimit)
defer tick.Stop()
for t := range tick.C {
select {
case quotas <- t:
default:
}
}
}()
for r := range requests {
<-quotas
go handle(r)
}
}
func main() {
requests := make(chan Request)
go handleRequests(requests)
// time.Sleep(time.Minute)
for i := 0; ; i++ {requests <- i}
}
case
operations of a select
code block to affect the branch selection in the select
code block.
nil
. The case
branch corresponding the nil channel will not get selected for sure. We can think such case
branches are in off status. At the end of each loop step, the on/off statuses of the two case
branches are switched.
package main
import "fmt"
import "time"
import "os"
type Ball uint8
func Play(playerName string, table chan Ball, serve bool) {
var receive, send chan Ball
if serve {
receive, send = nil, table
} else {
receive, send = table, nil
}
var lastValue Ball = 1
for {
select {
case send <- lastValue:
case value := <- receive:
fmt.Println(playerName, value)
value += lastValue
if value < lastValue { // overflow
os.Exit(0)
}
lastValue = value
}
// Switch on/off.
receive, send = send, receive
time.Sleep(time.Second)
}
}
func main() {
table := make(chan Ball)
go Play("A:", table, false)
Play("B:", table, true)
}
1212...
when running. It has not much usefulness in practice. It is shown here just for learning purpose.
package main
import "fmt"
import "time"
func main() {
for c := make(chan struct{}, 1); true; {
select {
case c <- struct{}{}:
fmt.Print("1")
case <-c:
fmt.Print("2")
}
time.Sleep(time.Second)
}
}
case
branch in a select
code block to increase the execution possibility weigh of the corresponding code.
package main
import "fmt"
func main() {
foo, bar := make(chan struct{}), make(chan struct{})
close(foo); close(bar) // for demo purpose
x, y := 0.0, 0.0
f := func(){x++}
g := func(){y++}
for i := 0; i < 100000; i++ {
select {
case <-foo: f()
case <-foo: f()
case <-bar: g()
}
}
fmt.Println(x/y) // about 2
}
f
function being called is about the double of the g
function being called.
select
block is fixed, we can use the functionalities provided in the reflect
standard package to construct a select block at run time. The dynamically created select block can have an arbitrary number of case branches. But please note, the reflection way is less efficient than the fixed way.
reflect
standard package also provides TrySend
and TryRecv
functions to implement one-case-plus-default select blocks.
import (
"crypto/rand"
"encoding/binary"
)
func RandomGenerator() <-chan uint64 {
c := make(chan uint64)
go func() {
rnds := make([]byte, 8)
for {
_, err := rand.Read(rnds)
if err != nil {
close(c)
break
}
c <- binary.BigEndian.Uint64(rnds)
}
}()
return c
}
int64
, then the following function will aggregate an arbitrary number of data streams into one.
func Aggregator(inputs ...<-chan uint64) <-chan uint64 {
out := make(chan uint64)
for _, in := range inputs {
go func(in <-chan uint64) {
for {
out <- <-in // <=> out <- (<-in)
}
}(in)
}
return out
}
import "sync"
func Aggregator(inputs ...<-chan uint64) <-chan uint64 {
output := make(chan uint64)
var wg sync.WaitGroup
for _, in := range inputs {
wg.Add(1)
go func(int <-chan uint64) {
defer wg.Done()
// If in is closed, then the
// loop will ends eventually.
for x := range in {
output <- x
}
}(in)
}
go func() {
wg.Wait()
close(output)
}()
return output
}
select
block to aggregate these data streams.
// Assume the number of input stream is two.
...
output := make(chan uint64)
go func() {
inA, inB := inputs[0], inputs[1]
for {
select {
case v := <- inA: output <- v
case v := <- inB: output <- v
}
}
}
...
func Divisor(input <-chan uint64, outputs ...chan<- uint64) {
for _, out := range outputs {
go func(o chan<- uint64) {
for {
o <- <-input // <=> o <- (<-input)
}
}(out)
}
}
uint64
values from one stream and one uint64
value from another stream compose one new uint64
value. Surely, these stream channel element types are different generally in practice.
func Composer(inA, inB <-chan uint64) <-chan uint64 {
output := make(chan uint64)
go func() {
for {
a1, b, a2 := <-inA, <-inB, <-inA
output <- a1 ^ b & a2
}
}()
return output
}
func Duplicator(in <-chan uint64) (<-chan uint64, <-chan uint64) {
outA, outB := make(chan uint64), make(chan uint64)
go func() {
for x := range in {
outA <- x
outB <- x
}
}()
return outA, outB
}
uint64
value.
func Calculator(in <-chan uint64, out chan uint64) (<-chan uint64) {
if out == nil {
out = make(chan uint64)
}
go func() {
for x := range in {
out <- ^x
}
}()
return out
}
import "math/big"
func Filter0(input <-chan uint64, output chan uint64) <-chan uint64 {
if output == nil {
output = make(chan uint64)
}
go func() {
bigInt := big.NewInt(0)
for x := range input {
bigInt.SetUint64(x)
if bigInt.ProbablyPrime(1) {
output <- x
}
}
}()
return output
}
func Filter(input <-chan uint64) <-chan uint64 {
return Filter0(input, nil)
}
import "fmt"
func Printer(input <-chan uint64) {
for x := range input {
fmt.Println(x)
}
}
package main
... // the worker functions declared above.
func main() {
Printer(
Filter(
Calculator(
RandomGenerator(), nil,
),
),
)
}
package main
... // the worker functions declared above.
func main() {
filterA := Filter(RandomGenerator())
filterB := Filter(RandomGenerator())
filterC := Filter(RandomGenerator())
filter := Aggregator(filterA, filterB, filterC)
calculatorA := Calculator(filter, nil)
calculatorB := Calculator(filter, nil)
calculator := Aggregator(calculatorA, calculatorB)
Printer(calculator)
}
package main
... // the worker functions declared above.
func main() {
c1 := make(chan uint64, 100)
Filter0(RandomGenerator(), c1) // filterA
Filter0(RandomGenerator(), c1) // filterB
Filter0(RandomGenerator(), c1) // filterC
c2 := make(chan uint64, 100)
Calculator(c1, c2) // calculatorA
Calculator(c1, c2) // calculatorB
Printer(c2)
}
The Go 101 project is hosted on Github. Welcome to improve Go 101 articles by submitting corrections for all kinds of mistakes, such as typos, grammar errors, wording inaccuracies, description flaws, code bugs and broken links.
If you would like to learn some Go details and facts every serveral days, please follow Go 101's official Twitter account @zigo_101.
reflect
standard package.sync
standard package.sync/atomic
standard package.