What is the fan-out/fan-in pattern in Golang?
The fan-out/fan-in pattern is a
How it works
The pattern consists of two main stages: fan-out and fan-in.
-
Fan-out: In the fan-out stage, a single task is divided into multiple smaller subtasks, which are then executed concurrently. Each subtask can be assigned to a separate goroutine (lightweight concurrent thread in Go) to run in parallel. This stage distributes the workload across multiple goroutines, allowing for parallel processing.
-
Fan-in: In the fan-in stage, the results or outputs from all the concurrently executing subtasks are collected and combined into a single result. This stage waits for all the subtasks to complete and aggregates their results. The fan-in stage can also handle synchronization and coordination between the goroutines to ensure that all results are collected before proceeding.
How to implement
The fan-out/fan-in pattern is typically implemented using a combination of goroutines and channels in Go. Each subtask is assigned to a goroutine, and channels are used to pass data between the goroutines. The fan-in stage waits for the completion of all subtasks by coordinating through synchronization primitives like WaitGroups or using a channel to signal the completion of each subtask.
Let’s take an example, we have a task of doubling numbers. We create multiple worker goroutines (numWorkers) to concurrently process the jobs. The jobs are sent to the jobs channel, and the results are received from the results channel.
The main goroutine produces the jobs by sending them to the jobs channel. The worker goroutines receive the jobs from the channel, perform the task (doubling the number), and send the results to the results channel.
The fan-in stage is coordinated using a WaitGroup. The main goroutine waits for the completion of all jobs by calling the Wait() method. Once all jobs are completed, it closes the results channel. The main goroutine then processes the results by receiving them from the results channel.
Code
Below is the example that demonstrates the fan-out/fan-in pattern in Go:
package mainimport ("fmt""sync")// worker performs the task on jobs received and sends results to the results channel.func worker(id int, jobs <-chan int, results chan<- int) {for job := range jobs {// Perform the task hereresult := job * 2results <- result}}func main() {numJobs := 10numWorkers := 4// Create buffered channels for jobs and resultsjobs := make(chan int, numJobs)results := make(chan int, numJobs)// Fan-out: Create worker goroutinesfor w := 1; w <= numWorkers; w++ {go worker(w, jobs, results)}// Produce jobsfor j := 1; j <= numJobs; j++ {jobs <- j}close(jobs) // Close the jobs channel to signal no more jobs will be sent// Fan-in: Collect resultsvar wg sync.WaitGroupwg.Add(numJobs) // Set WaitGroup counter to the number of jobs// Launch a goroutine to wait for all jobs to finishgo func() {wg.Wait() // Wait for all jobs to be doneclose(results) // Close the results channel after all jobs are processed}()// Process resultsfor result := range results {fmt.Println("Result:", result)wg.Done() // Decrease the WaitGroup counter as each result is processed}}
Explanation
Here is a line-by-line explanation of the above code:
-
Lines 9–15: Defines a function
workerwhich takes three parameters:id: This is the worker’s identifier.jobs: This is a read-only channel from which the worker gets jobs.results: This is a write-only channel where the worker sends results.
The worker continuously receives jobs from the
jobschannel, processes them (doubling the job value), and sends the results to theresultschannel. -
Lines 18–19: Defines the number of jobs and the number of worker goroutines.
-
Lines 22–23: Two buffered channels:
jobsandresultsare created. Buffered channels have a capacity which prevents blocking in cases where the channel’s counterpart (read or write) isn’t ready. -
Lines 26–28: A loop is used to create
numWorkersgoroutines, each invoking theworkerfunction with the worker’s ID and thejobsandresultschannels. -
Lines 31–34: A loop produces
numJobsjobs by sending job numbers (1 throughnumJobs) to thejobschannel. After sending all jobs, the channel is closed to signal that no more jobs will be sent. -
Lines 37–38: A
sync.WaitGroupnamedwgis created to wait for all jobs to be completed. The counter is initially set to the number of jobs. -
Lines 41–44: A goroutine is launched to wait for the completion of all jobs. It waits using
wg.Wait()until the counter reaches zero, then it closes theresultschannel to signal that no more results will be sent. -
Lines 47–50: Enters a loop to process
resultsfrom the results channel. It prints each result and callswg.Done()to decrement theWaitGroupcounter for each processed result. This loop continues until theresultschannel is closed, and all results have been processed, at which point the program exits.
Unlock your potential: Golang series, all in one place!
To continue your exploration of Golang, check out our series of Answers below:
What is the NewReplacer function in golang?
Learn how Go'sstrings.NewReplacer()efficiently replaces multiple substrings in a single pass, avoiding sequential replacements.Type Assertions and Type Switches in Golang
Learn how type assertions and type switches in Go enable dynamic type handling within interfaces, ensuring type safety and flexibility for robust and efficient programming.What is the fan-out/fan-in pattern in Golang
Learn how the fan-out/fan-in pattern in Go parallelizes tasks using goroutines and channels, enabling concurrent execution and efficient result aggregation.Getting Started with Golang Unit Testing
Learn how to perform unit testing in Go by creating_test.gofiles, using thetestingpackage, and writing clear test cases.How to parse xml file to csv using golang with dynamic shcema?
Learn how to use Go'sencoding/xmlandencoding/csvpackages to dynamically convert XML files to CSV.
Free Resources