Channel Use Cases

Before reading this article, please read the article channels in Go, which explains channel types and values in detail. New gophers may need read that article and the current one several times to master Go channel programming.

The remaining of this article will show all kinds of channel use cases. I hope this article will convince you that

Please note that, the intention of this article is to show as many chanel use cases as possible. We should know that channel is not the only concurrency synchronization technique supported in Go. And for many circumstances, the channel way may be not the best solution. Please read atomic operations and some other synchronization techniques for more concurrency synchronization techniques in Go.

Many Go channel articles classify channel use cases as some pattern categories, such as request-response pattern, event based notification pattern and data flow pattern, etc. This current article doesn't follow this way, for the barriers between those patterns are some blur. Many use cases may belong to multiple pattern categories. This current article just show all kinds of use cases.

Use Channels As Futures/Promises

By using goroutines and channels together, we can achieve the same effects of the future and promise asynchronous programming feature supported in many other languages.

Futures and promises are often associated with requests and responses. Generally, receive-only channels can be viewed as futures (from the request side), and send-only channels can be viewed as promises (from the response side).

Return Receive-Only Channels As Results

In the following example, the values of two arguments of the sumSquares function call are requested concurrently. As the two channels are both unbuffered channels, each of the two channel receive operations will block until a send operation performs on the corresponding channel. It takes about three seconds instead of six seconds to return the final result.

package main

import (
	"time"
	"math/rand"
	"fmt"
)

func longTimeRequest() <-chan int32 {
	r := make(chan int32)

	// This goroutine treats the channel r as a promise.
	go func() {
		time.Sleep(time.Second * 3) // simulate a workload
		r <- rand.Int31n(100)
	}()

	return r // return r as a future
}

func sumSquares(a, b int32) int32 {
	return a*a + b*b
}

func main() {
	rand.Seed(time.Now().UnixNano())

	a, b := longTimeRequest(), longTimeRequest()
	fmt.Println(sumSquares(<-a, <-b))
}

Pass Send-Only Channels As Arguments

Same as the last example, in the following example, the values of two arguments of the sumSquares function call are requested concurrently. Different to the last example, the longTimeRequest function takes a send-only channel as parameter instead of returning a receive-only channel result.

package main

import (
	"time"
	"math/rand"
	"fmt"
)

// Channel r is viewed as a promise by this function.
func longTimeRequest(r chan<- int32)  {
	time.Sleep(time.Second * 3)
	r <- rand.Int31n(100)
}

func sumSquares(a, b int32) int32 {
	return a*a + b*b
}

func main() {
	rand.Seed(time.Now().UnixNano())

	ra, rb := make(chan int32), make(chan int32)
	go longTimeRequest(ra)
	go longTimeRequest(rb)

	fmt.Println(sumSquares(<-ra, <-rb))
}

In fact, for the above specified example, we don't need two channels to transfer results. Using one channel is okay.
...

	results := make(chan int32, 2) // can be buffered or not
	go longTimeRequest(results)
	go longTimeRequest(results)

	fmt.Println(sumSquares(<-results, <-results))
}

This is kind of data aggregation which will be introduced specially below.

The First Response Wins

This is an enhancement for the last use case.

Sometimes, a piece of data can retrieved from several sources. For a lot of factors, the response durations of these sources may vary much. Even for a specified source, its response durations are also not constant. To make the response duration as short as possible, we can send a request to every source in a seperated goroutine. Only the first response will be used, other slower ones will be disgarded.

Note, If there are n sources, the capacity of the communication channel must be at least n-1, to avoid the goroutines corresponding the disgarded responses blocking for ever.

package main

import (
	"fmt"
	"time"
	"math/rand"
)

func source(c chan<- int32) {
	ra, rb := rand.Int31(), rand.Intn(3) + 1
	time.Sleep(time.Duration(rb) * time.Second) // sleep 1s, 2s or 3s
	c <- ra
}

func main() {
	rand.Seed(time.Now().UnixNano())

	startTime := time.Now()
	c := make(chan int32, 5) // need a buffered channel
	for i := 0; i < cap(c); i++ {
		go source(c)
	}
	rnd := <- c // only the first response is used
	fmt.Println(time.Since(startTime))
	fmt.Println(rnd)
}

There are some other ways to implement the first-reponse-win use case, by using the select mechanism and a buffered channel which capacity is one. The other ways will be introduced below.

More Request-Response Variants

The parameter and result channels can be buffered so that the response sides don't need to wait the request sides to take out the transferred values.

Sometimes, a request is not guarenteed to be responsed back a normal value. For all kinds of reasons, an error may be returned instead. For such scenarios, we can use a struct type like struct{v T; err error} or a blank interface type as the channel element type.

Sometimes, for some reasons, the response may need much longer time than the expected to arrive, or will nerver arrive. We can use the timeout mechanism introduced below to handle such circumstances.

Sometimes, a sequence of values may be returned from the response side, this is kind of the data flow mechanism mentioned later below.

Use Channels For Notifications

Notifications can be viewed as special requests/reponses in which the values sent to or received from channels are not important. Generally, we use the blank struct type struct{} as the element types of the notification channels, for the size of type struct{} is zero, hence values of struct{} doesn't consume memory.

1-To-1 Notification By Sending A Value To A Channel

If there are no values to receive from a channel, then the next channel receive operation on the channel will block until another goroutine send a value to the channel. So we can send a value to a channel to notify another goroutine which is waiting to receive a value from the same channel.

In the following example, the channel done is used as a signal channel to do notifications.
package main

import (
	"crypto/rand"
	"fmt"
	"os"
	"sort"
)

func main() {
	values := make([]byte, 32 * 1024 * 1024)
	if _, err := rand.Read(values); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}

	done := make(chan struct{})
	go func() { // the sorting goroutine
		sort.Slice(values, func(i, j int) bool {
			return values[i] < values[j]
		})
		done <- struct{}{} // notify sorting is done
	}()

	// do some other things ...

	<- done // waiting here for notification
	fmt.Println(values[0], values[len(values)-1])
}

1-To-1 Notification By Receiving A Value From A Channel

If the value buffer queue of a channel is full, a send operation on the channel will block until another goroutine receive a value from the channel. So we can receive a value from a channel to notify another goroutine which is waiting to send a value to the same channel. Generally, the channel should be an unbuffered channel.

This notification way is used much less common than the way introduced in the last example.

package main

import (
	"fmt"
	"time"
)

func main() {
	done := make(chan struct{}, 1) // the signal channel
	done <- struct{}{}             // fill the channel
	// Now the channel done is full. A new send will block.

	go func() {
		fmt.Print("Hello")
		time.Sleep(time.Second * 2) // simulate a workload
		<- done // receive a value from the done channel, to
		        // unblock the second send in main goroutine.
	}()

	// do some other things ...

	done <- struct{}{} // block here until a receive is made.
	fmt.Println(" world!")
}

In fact, the channel in the above example acts as a one-time binary semaphore, which will be introduced specially below.

N-To-1 And 1-To-N Notifications

By extending the above two use cases a little, it is easy to do N-To-1 and 1-To-N notifications.
package main

import "log"
import "time"

func worker(id int, ready <-chan struct{}, done chan<- struct{}) {
	<-ready // wait until ready is closed
	log.Print("Worker#", id, " started to process.")
	time.Sleep(time.Second) // simulate a workload
	log.Print("Worker#", id, " finished its job.")
	done <- struct{}{} // notify the main goroutine (N-to-1)
}

func main() {
	log.SetFlags(0)

	ready, done := make(chan struct{}), make(chan struct{})
	go worker(0, ready, done)
	go worker(1, ready, done)
	go worker(2, ready, done)

	time.Sleep(time.Second * 2) // simulate an initialzation phase
	// 1-to-N notifications.
	ready <- struct{}{}; ready <- struct{}{}; ready <- struct{}{}
	// Being N-to-1 notified.
	<-done; <-done; <-done
}

In fact, the ways to do 1-to-N and N-to-1 notifications introduced in ths sub-section are not used commonly in practice. In pratice, we often use sync.WaitGroup to do N-to-1 notifications, and we do 1-to-N notifications by close channels. Please read the next sub-section for details.

Broadcast (1-To-N) Notifications by Closing A Channel

The way to do 1-to-N notifications shown in the last sub-section is seldom used in practice, for there is a better way. By making using of the feature that infinite values can be received from a closed channel, we can close a channel to broadcast notifications.

By the example in the last sub-section, we can replace the three channel send operations ready <- struct{}{} in the last example with one channel close opeartion close(ready) to do an 1-to-N notifications.
...

	// ready <- struct{}{}; ready <- struct{}{}; ready <- struct{}{}
	close(ready) // broadcast notifications
...

Surely, we can also close a channel to do an 1-to-1 notification. In fact, this is the most used notification way in Go. The feature that infinite values can be received from a closed channel will be utilized in many other use cases introduced below.

Use One Channel To Notify Multiple Times

Same as a response can return a sequence of results, we can also use a channel to do multiple 1-to-1 notifications. The logic is simple, so no examples are provided for such use case.

Timer: Scheduled Notification

It is easy to use channels to implement one-time timers.

A custom one-time timer implementaion:

package main

import (
	"fmt"
	"time"
)

func AfterDuration(d time.Duration) <- chan struct{} {
	c := make(chan struct{}, 1)
	go func() {
		time.Sleep(d)
		c <- struct{}{}
	}()
	return c
}

func main() {
	fmt.Println("Hi!")
	<- AfterDuration(time.Second)
	fmt.Println("Hello!")
	<- AfterDuration(time.Second)
	fmt.Println("Bye!")
}

In fact, the After function in the time standard package provides the same functionality, with a much more efficient implementaion. We should use that function instead to make code look clean and run efficiently.

Please note, <-time.After(aDuration) will make the current goroutine enter blocking state, but a time.Sleep(aDuration) functon call will not.

The use of <-time.After(aDuration) is often used in the timeout mechanism which will be introduced below.

Use Channels As Mutex Locks

One of the above examples has shown that one-capacity buffered channels can be used as one-time binary semaphore. In fact, such channels can also be used as multi-time binary semaphores, a.k.a., mutex locks, though such mutex locks are not efficient as the mutexes provided in the sync standard package.

There are two styles to use one-capacity buffered channels as mutex locks.
  1. Lock through a send, unlock through a receive.
  2. Lock through a receive, unlock through a send.

Here, only a lock-through-send example is shown. Please note that, the capacity of the channel which is used as a mutex must be one.
package main

import "fmt"

func main() {
	mutex := make(chan struct{}, 1) // the capacity must be one

	counter := 0
	increase := func() {
		mutex <- struct{}{} // lock
		counter++
		<-mutex // unlock
	}

	increase1000 := func(done chan<- struct{}) {
		for i := 0; i < 1000; i++ {
			increase()
		}
		done <- struct{}{}
	}

	done := make(chan struct{})
	go increase1000(done)
	go increase1000(done)
	<-done; <-done
	fmt.Println(counter) // 2000
}

Use Channels As Counting Semaphores

Buffered channels with capacity larger than one can be used as counting semaphores. Counting semaphores can be viewed as multi-owner locks. If the capacity of a channel is N, then it can viewed as a semaphore which can have most N owners at any time. Binary semaphores (mutexes) are special counting semaphores, each of binary semaphores can has at most one owner at any time. Counting semaphores are often used to limit throughput and ensure resource quotas.

Like using channels as mutexes, there are also two styles to acquire one piece of ownerships of a channel semaphore.
  1. Acquire ownership through a send, release through a receive.
  2. Acquire ownership through a receive, release through a send.
An example of acquiring ownerships through receiving values from a channel.
package main

import (
	"log"
	"time"
	"math/rand"
)

type Seat int
type Bar chan Seat

func (bar Bar) ServeConsumer(customerId int) {
	log.Print("-> consumer#", customerId, " enters the bar")
	seat := <- bar // need a seat to drink
	log.Print("consumer#", customerId, " drinks at seat#", seat)
	time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
	log.Print("<- consumer#", customerId, " frees seat#", seat)
	bar <- seat // free the seat and leave the bar
}

func main() {
	rand.Seed(time.Now().UnixNano())

	bar24x7 := make(Bar, 10) // the bar has 10 seats
	// Place seats in an bar.
	for seatId := 0; seatId < cap(bar24x7); seatId++ {
		bar24x7 <- Seat(seatId) // none of the sends will block
	}

	for customerId := 0; ; customerId++ {
		time.Sleep(time.Second)
		go bar24x7.ServeConsumer(customerId)
	}
	for {time.Sleep(time.Second)} // sleeping != blocking
}

In the above example, only the consumers each of who get a seat can drink. So there will be most ten consumers are drinking at any given time.

The last for loop in the main function is to avoid the program exiting. There is a better way, which will be introduced below, to do the job.

Although there will be most ten consumers are drinking at any given time, there may be more than ten consumers are served at the bar at the same time. Some consumers are waiting for free seats. Although each consumer goroutine consumes much less resources than a system thread, the total resources consumed by a large quantity of goroutines are not neglectable. So it is best to create a consumer goroutine until a free seat is available.
... // same code as the above example

func (bar Bar) ServeConsumerAtSeat(customerId int, seat Seat) {
	log.Print("consumer#", customerId, " drinks at seat#", seat)
	time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
	log.Print("<- consumer#", customerId, " frees seat#", seat)
	bar <- seat // free the seat and leave the bar
}

func main() {
	rand.Seed(time.Now().UnixNano())

	bar24x7 := make(Bar, 10) // the bar has 10 seats
	// Place seats in an bar.
	for seatId := 0; seatId < cap(bar24x7); seatId++ {
		bar24x7 <- Seat(seatId) // none of the sends will block
	}

	for customerId := 0; ; customerId++ {
		time.Sleep(time.Second)
		seat := <- bar24x7 // need a seat to serve next consumer
		go bar24x7.ServeConsumerAtSeat(customerId, seat)
	}
	for {time.Sleep(time.Second)} // sleeping != blocking
}

There will be most about ten live consumer goroutines coexisting in the above optimized version.

The style of acquiring ownership through sending is simpler comparatively. There is no the step of placing seats.
package main

import (
	"log"
	"time"
	"math/rand"
)

type Consumer struct{id int}
type Bar chan Consumer

func (bar Bar) ServeConsumer(c Consumer) {
	log.Print("-> consumer#", c.id, " starts drinking")
	time.Sleep(time.Second * time.Duration(10 + rand.Intn(10)))
	log.Print("<- consumer#", c.id, " leaves the bar")
	<- bar // leaves the bar and save a space
}

func main() {
	rand.Seed(time.Now().UnixNano())

	bar24x7 := make(Bar, 10) // can serve most 10 consumers
	for customerId := 0; ; customerId++ {
		time.Sleep(time.Second)
		consumer := Consumer{customerId}
		bar24x7 <- consumer // try to enter the bar
		go bar24x7.ServeConsumer(consumer)
	}
	for {time.Sleep(time.Second)}
}

There is a variant of the channel semaphore use case. In the above two examples, although the throughput is limited, but there may be many requests (consumers) are queuing. This is not always a good idea, sometimes it would be better to suggest queuing consumers go to other bars, by using the try-receive or try-send mechanisms introduced below.

Ping-Pong (Dialogue)

Sometimes, a piece of data will be processed, or messages will be communicated, between two goroutines back and forth. It is some like a ping-pong game or the goroutines are dialoguing.

An example which will print a series of Fibonacci numbers.
package main

import "fmt"
import "time"
import "os"

type Ball uint64

func Play(playerName string, table chan Ball) {
	var lastValue Ball = 1
	for {
		ball := <- table // get the ball
		fmt.Println(playerName, ball)
		ball += lastValue
		if ball < lastValue { // overflow
			os.Exit(0)
		}
		lastValue = ball
		table <- ball // bat back the ball
		time.Sleep(time.Second)
	}
}

func main() {
	table := make(chan Ball)
	go func() {
		table <- 1 // throw ball on table
	}()
	go Play("A:", table)
	Play("B:", table)
}

Channel Encapsulated In Channel

Sometimes, we can use a channel type as the element type of another channel type. In the following example, <-chan chan<- int is a receive-only channel type which element type is a send-only channel type chan<- int.
package main

import "fmt"

var counter = func (n int) chan<- chan<- int { // chan<- (chan<- int)
	requests := make(chan chan<- int) // chan (chan<- int)
	go func() {
		for request := range requests {
			if request == nil {
				n++ // increase
			} else {
				request <- n // retrieve
			}
		}
	}()
	return requests // implicitly converted to chan<- (chan<- int)
}(0)

func main() {
	increase1000 := func(done chan<- struct{}) {
		for i := 0; i < 1000; i++ {
			counter <- nil
		}
		done <- struct{}{}
	}

	done := make(chan struct{})
	go increase1000(done)
	go increase1000(done)
	<-done; <-done

	request := make(chan int, 1)
	counter <- request
	fmt.Println(<-request) // 2000
}

Although here the encapsulation implementaion may be not the most efficient way for the above specified example, the use case may be useful for some other scenarios.

Check Lengths And Capacities Of Channels

We can use the built-in functions len and cap to check the length and capacity of a channel, though we seldom do this in practice. The reason for we seldom use the len function to check the length of a channel is the length of the channel may have changed after the len function call returns. The reason for we seldom use the cap function to check the capacity of a channel is the capacity of the channel is often known or not important.

However, there do have some scenarios we need to use the two functions.

For example, sometimes, we want to receive all the values buffered in a non-closed channel c which no ones will send values to any more, then we can use the following code to receive remaining values.
for len(c) > 0 {
	value := <-c
	// use value ...
}

We can also use the try-receive mechanism introduced below to do the same job. The efficiencies of the two ways are almost the same.

Sometimes, a goroutine may want to write some values to a buffered channel c until it is full without entering blocking state at the end, and the goroutine is the only sender of the channel, then we can use the following code to do this job.
for len(c) < cap(c) {
	c <- aValue
}

Block The Current Goroutine For Ever

The select mechanism is a unique feature in Go. It brings many patterns and tricks for concurrent programming. About the code execution rules of the select mechanism, please read the article channels in Go.

We can use a blank select block select{} to block the current goroutine for ever. This is the simplest use case of the select mechanism. In fact, some uses of for {time.Sleep(time.Second)} in some above examples can be replaced with select{}.

Generally, select{} is used to prevent the main goroutine from exiting, for if the main goroutine exits, the whole program will also exit.

An example:
package main

import "time"

func DoSomething() {
	for {
		// do something ...
		time.Sleep(time.Hour) // sleeping is not blocking
	}
}

func main() {
	go DoSomething()
	select{}
}

By the way, there are some other ways to make a goroutine stay in blocking state for ever. But the select{} way is the simplest one.

Try-Send And Try-Receive

In Go, a select block with one default branch and only one case branch is called a try-send or try-receive channel operation, depending on whether the channel operation following the case keyword is a channel send or receive operation.

Try-send and try-receive operations never block.

The standard Go compiler makes special optimations for try-send and try-receive select blocks, their execution efficiencies are much higher than multi-case select blocks.

The following is an example shows how try-send and try-receive work.
package main

import "fmt"

func main() {
	type Book struct{id int}
	bookshelf := make(chan Book, 3)

	for i := 0; i < cap(bookshelf) * 2; i++ {
		select {
		case bookshelf <- Book{id: i}:
			fmt.Println("succeed to put book", i)
		default:
			fmt.Println("failed to put book")
		}
	}

	for i := 0; i < cap(bookshelf) * 2; i++ {
		select {
		case book := <-bookshelf:
			fmt.Println("succeed to get book", book.id)
		default:
			fmt.Println("failed to get book")
		}
	}
}
The output of the above program:
succeed to put book 0
succeed to put book 1
succeed to put book 2
failed to put book
failed to put book
failed to put book
succeed to get book 0
succeed to get book 1
succeed to get book 2
failed to get book
failed to get book
failed to get book

The following sub-sections will show more try-send and try-receive use cases.

Check If An Unbuffered Channel Is Closed Without Blocking The Current Goroutine

Assume it is guaranteed that no values were ever sent to an unbuffered channel, We can use the following code to check whether or not then unbuffered channel is closed without blocking the current goroutine, where T the element type of the corresponding channel type.
func IsClosed(c chan T) bool {
	select {
	case <-c:
		return true
	default:
	}
	return false
}

The way to check if an unbuffered channel is closed is used popularly in Go concurrent programming.

Peak Limiting

At the end of the use channels as counting semaphores section above, it is mentioned that it would be better to suggest that new coming consumers go to other bars if there are no available seats in the current bar. This is call peak limiting.

The following is a modified version of the last example in the use channels as counting semaphores section.
...
	bar24x7 := make(Bar, 10) // can serve most 10 consumers
	for customerId := 0; ; customerId++ {
		time.Sleep(time.Second)
		consumer := Consumer{customerId}
		select {
		case bar24x7 <- consumer: // try to enter the bar
			go bar24x7.ServeConsumer(consumer)
		default:
			log.Print("consumer#", customerId, " goes elsewhere")
		}
	}
...

Another Way to Implement The First-Response-Wins Use Case

As above has mentioned, we can use the select mechanism (try-send) with a buffered channel which capacity is one (at least) to implement the first-response-wins use case. For example,
package main

import (
	"fmt"
	"math/rand"
	"time"
)

func source(c chan<- int32) {
	ra, rb := rand.Int31(), rand.Intn(3)+1
	time.Sleep(time.Duration(rb) * time.Second) // sleep 1s, 2s or 3s
	select {
	case c <- ra:
	default:
	}
}

func main() {
	rand.Seed(time.Now().UnixNano())

	c := make(chan int32, 1) // the capacity should be at least 1
	for i := 0; i < 5; i++ {
		go source(c)
	}
	rnd := <-c // only the first response is used
	fmt.Println(rnd)
}

Please note, the capacity of the channel used in the above example must be at least one, so that the first send will not get missed if the receiver/request side has not get ready in time.

The Third Way to Implement The First-Response-Wins Use Case

For a first-response-wins use case, if the number of sources are small, for example, two or three, we can use a select code block to receive the source responses at the same time. For example,
package main

import (
	"fmt"
	"math/rand"
	"time"
)

func source() <-chan int32 {
	c := make(chan int32, 1) // must be a buffered channel
	go func() {
		ra, rb := rand.Int31(), rand.Intn(3)+1
		time.Sleep(time.Duration(rb) * time.Second)
		c <- ra
	}()
	return c
}

func main() {
	rand.Seed(time.Now().UnixNano())

	var rnd int32
	select{
	case rnd = <-source():
	case rnd = <-source():
	case rnd = <-source():
	}
	fmt.Println(rnd)
}

The two ways introduced in this and the last sub-sections can also be used to do any-to-1 notifications.

Timeout

In some request-response scenarios, for all kinds of reasons, a request may need a long time to response, sometimes even will never response. For such circumstances, we should return an error message to the client side by using a timeout solution. Such a timeout solution can be implemented with the select mechanism.

The following code shows how to make a request with timeout.
func requestWithTimeout(timeout time.Duration) (int, error) {
	c := make(chan int)
	go doRequest(c) // may need a long time to response

	select {
	case data := <-c:
		return data, nil
	case <-time.After(timeout):
		return 0, errors.New("timeout")
	}
}

Ticker

We can use a buffered channel and try-send mechanism to implement a ticker.
package main

import "fmt"
import "time"

func Tick(d time.Duration) <-chan struct{} {
	c := make(chan struct{}, 1) // the capacity should be exactly one
	go func() {
		for {
			time.Sleep(d)
			select {
			case c <- struct{}{}:
			default:
			}
		}
	}()
	return c
}

func main() {
	t := time.Now()
	for range Tick(time.Second) {
		fmt.Println(time.Since(t))
	}
}

In fact, there is a Tick function in the time standard package provides the same functionality, with a much more efficient implementaion. We should use that function instead to make code look clean and run efficiently.

Rate Limiting

One of above section has shown how to use try-send to do peak limiting. We can also use try-send to do rate limiting. The following shows such an example borrowed from the official Go wiki. In this example, the average number of handled requests per second in a long period is no more than 10. But sometimes, in a short period, there may be a burst of most 5 requests being handled parrellelly.
import "time"

type Request interface{}
func handle(Request) {/* do something */}

const RateLimit = 10
const BurstLimit = 5 // 1 means bursts are not supported.

func handleRequests(requests <-chan Request) {
	throttle := make(chan time.Time, BurstLimit)

	go func() {
		tick := time.NewTicker(time.Second / RateLimit)
		defer tick.Stop()
		for t := range tick.C {
			select {
			case throttle <- t:
			default:
			}
		}
	}()

	for reqest := range requests {
		<-throttle
		go handle(reqest)
	}
}

Switches

From the article channels in Go, we have learned that a goroutine will block for ever if it tries to send a value to or receive from a nil channel. By making use of this fact, we can change the involved channels in a select code block to affect the branch selection in the select code block.

The following is another ping-pong example which is implemented by using the select mechanism. In this example, one of the two channel variables involved in the select block is nil. The case branch corresponding the nil channel will not get selected for sure. We can think such case branchs are in off status. In the end of each loop step, the on/off statuses of the two case branchs are switched.
package main

import "fmt"
import "time"
import "os"

type Ball uint8
func Play(playerName string, table chan Ball, serve bool) {
	var receive, send chan Ball
	if serve {
		receive, send = nil, table
	} else {
		receive, send = table, nil
	}
	var lastValue Ball = 1
	for {
		select {
		case send <- lastValue:
		case value := <- receive:
			fmt.Println(playerName, value)
			value += lastValue
			if value < lastValue { // overflow
				os.Exit(0)
			}
			lastValue = value
		}
		receive, send = send, receive // switch on/off
		time.Sleep(time.Second)
	}
}

func main() {
	table := make(chan Ball)
	go Play("A:", table, false)
	Play("B:", table, true)
}

Control Code Exection Possibility Weights

We can duplicate a case branch in a select code block to increase the exection possibility weigh of the corresponding code snippet.

Example:
package main

import "fmt"

func main() {
	foo, bar := make(chan struct{}), make(chan struct{})
	close(foo); close(bar) // for demo purpose
	x, y := 0.0, 0.0
	f := func(){x++}
	g := func(){y++}
	for i := 0; i < 100000; i++ {
		select {
		case <-foo: f()
		case <-foo: f()
		case <-bar: g()
		}
	}
	fmt.Println(x/y) // about 2
}

The possibility of the f function being called is about the double of the g function being called.

Select From Dynamic Number Of Cases

We can use the functionalities provided in the reflect standard package to construct a select block at run time. The dynamically created select block can have arbitrary number of case branches. But please note, the reflection way is less efficient than the fixed way.

The reflect standard package also provides TrySend and TryRecv functions to implement one-case-plus-default select blocks.

Data Flow Manipulations

This section will introduce some use cases in which long life channels are used for data flow manipulations. There are many data flow related application scenarios in practice, such as message queue (pub/sub), big data processing (map/reduce), load balancer, and division of labour, etc.

Generally, a data flow application is composed of many modules. Different modules do different jobs. Each module may own one or a group of workers (goroutines), which concurrently do the same job specified for that module. Here is a list of some module job examples in practice:

A worker in a module group may receive data from several other modules as inputs and send data to serve other modules as outputs. In other words, a module can be both a data consumer and a data producer. A module which only sends data to some other modules but never receives data from other modules is called a producer-only module. A module which only receives data from some other modules but never sends data to other modules is called a consumer-only module.

Many modules together compose a data flow system.

Following will show some data flow module worker implementations. These implementations are for education purpose, so they may be neither efficient nor flexible.

Data Generation/Collecting/Loading

There are all kinds of producer-only modules. A producer-only moudle worker may produce data stream Here, we use a random number generator as an example. The generator function returns one result but takes no parameters.
import (
	"crypto/rand"
	"encoding/binary"
)

func RandomGenerator() <-chan uint64 {
	c := make(chan uint64)
	go func() {
		rnds := make([]byte, 8)
		for {
			_, err := rand.Read(rnds)
			if err != nil {
				close(c)
			}
			c <- binary.BigEndian.Uint64(rnds)
		}
	}()
	return c
}

In fact, the random number generator is a multi-return future which has been introudced at the starting of the current article.

A data producer may close the output stream channel at any time to end data generating.

Data Aggregation

A data aggregation module worker aggregates several data streams of the same data type into one stream. Assume the data type is int64, the following function will aggregate arbitrary number of data streams into one.
func Aggregator(inputs ...<-chan uint64) <-chan uint64 {
	output := make(chan uint64)
	for _, in := range inputs {
		in := in // this line is important
		go func() {
			for {
				output <- <-in // <=> output <- (<-in)
			}
		}()
	}
	return output
}

A better implementation should consider whether or not an input stream has been closed. (Also valid for the following other module worker implementations.)
...
		in := in // this line is important
		go func() {
			for {
				x, ok := <-in
				if ok {
					output <- x
				} else {
					close(output)
				}
			}
		}()
...

If the number of aggregated data streams are very few (two or three), we can use select block to aggregate these data streams.
// Assume the number of input stream is two.
...
	output := make(chan uint64)
	go func() {
		inA, inB := inputs[0], inputs[1]
		for {
			select {
			case v := <- inA: output <- v
			case v := <- inB: output <- v
			}
		}
	}
...

Data Division

A data division module worker does the opposite of a data aggregation module worker. It is easy to implement a division worker, but in practice, division workers are not very useful and seldom used.
func Divisor(input <-chan uint64, outputs ...chan<- uint64) {
	for _, out := range outputs {
		out := out // this line is important
		go func() {
			for {
				out <- <-input // <=> out <- (<-input)
			}
		}()
	}
}

Data Composition

Data composition is like data aggregation, but a data composition worker merges data streams of different data types. For data aggregation, two pieces of data are still two pieces of data. But for data composition, several pieces of data compose one piece of new data.

The following is a composition worker example, in which two uint64 values from one stream and one uint64 value from another stream compose one new uint64 value. Generally these stream channel element types are different in parctice. Here they are the same in the following example is to make the data flow system assembling explanations below simpler.
func Composor(inA <-chan uint64, inB <-chan uint64) <-chan uint64 {
	output := make(chan uint64)
	go func() {
		for {
			a1, b, a2 := <-inA, <-inB, <-inA
			output <- a1 ^ b & a2
		}
	}()
	return output
}

Data Decomposition

Data decomposition is the inverse process of data composition. A decomposition worker function implementation takes one input data stream parameter and returns several data stream results.

Data Duplication/Proliferation

Data duplications (proliferations) can be viewed as special data decompositions. One piece of data will be duplicated and each of the duplicated data will be send to different output data streams.

An example:
func Duplicator(in <-chan uint64) (<-chan uint64, <-chan uint64) {
	outA, outB := make(chan uint64), make(chan uint64)
	go func() {
		for {
			x := <-in
			outA <- x
			outB <- x
		}
	}()
	return outA, outB
}

Data Calculation/Analyzation

The functionalities of data calculation and analuzation modules vary much and are very specfic each. Generally, a worker function of such modules transforms each piece of input data into another piece of output data.

For simple demo purpose, here shows a worker example which inverts every bit of each transferred uint64 value.
func Calculator(input <-chan uint64) (<-chan uint64) {
	output := make(chan uint64)
	go func() {
		for {
			x := <-input
			output <- ^x
		}
	}()
	return output
}

Data Validation/Filtering

A data validation or filtering module discards some transferred data in a stream. For example, the following worker function discards all non-prime numbers.
import "math/big"

func Filter(input <-chan uint64) (<-chan uint64) {
	output := make(chan uint64)
	go func() {
		bigInt := big.NewInt(0)
		for {
			x := <-input
			bigInt.SetUint64(x)
			if bigInt.ProbablyPrime(1) {
				output <- x
			}
		}
	}()
	return output
}

Data Serving/Saving

Generally, a data serving or saving module is the last or final outout module in a data flow system. Here just provides a simple worker which prints each piece of data received from the input stream.

import "fmt"

func Printer(input <-chan uint64) {
	for {
		x, ok := <-input
		if ok {
			fmt.Println(x)
		} else {
			return
		}
	}
}

Data Flow System Assembling

Now, let's use the above module worker functions to assemble several data flow systems. Assembling a data flow system is just to create some workers of different modules, and specify the input streams for every workers.

Data flow system example 1 (a linear pipeline):
package main

... // the worker functions declared above.

func main() {
	Printer(
		Filter(
			Calculator(
				RandomGenerator(),
			),
		),
	)
}

The above data flow system is depicted in the following diagram.

Data flow system example 2 (a directed acyclic graph pipeline):
package main

... // the worker functions declared above.

func main() {
	filterA := Filter(RandomGenerator())
	filterB := Filter(RandomGenerator())
	filterC := Filter(RandomGenerator())
	filter := Aggregator(filterA, filterB, filterC)
	calculatorA := Calculator(filter)
	calculatorB := Calculator(filter)
	calculator := Aggregator(calculatorA, calculatorB)
	Printer(calculator)
}

The above data flow system is depicted in the following diagram.

More complex data flow topologies may be arbitrary graphs. For example, a data flow system may have multiple final outputs. But data flow systems with cyclic-graph topologies are seldom used in reality.

From the above two examples, we can find that it is very easy and intuitive to build data flow systems with channels.

From the last example, we can find that, in a data system, if we aggregate the respective output streams of all the workers in a module, then we can use the aggregated result stream as the respective input streams of the all the workers in the next module. By using this way, it is easy to do fan-in and fan-out the number of workers for a specified module.

The above explanations for data flow systems don't consider much on how to close data streams. Please read this article for explanations on how to gracefully close channels.


The Go 101 project is hosted on both github and gitlab. Welcome to improve Go 101 articles by submitting corrections for all kinds of mistakes, such as typos, grammar errors, wording inaccuracies, description flaws, code bugs and broken links.

Support Go 101 by playing Tapir's games. Cryptocurrency donations are also welcome:
Bitcoin: 1xucQbv5jujFPPwhyg395ri5yV71hx9g9
Ethereum: 0x5dc4aa2c2bbfaadae373dadcfca11b3358912212