Concurrency di Go Dari Basic hingga Advanced
Daftar Isi
1. Pengenalan Concurrency
Apa itu Concurrency?
Concurrency adalah kemampuan program untuk menangani beberapa tugas secara bersamaan (concurrent), bukan paralel. Perbedaannya:
Concurrency: Menangani banyak tugas dalam periode waktu yang sama (dealing with lots of things at once)
Parallelism: Menjalankan banyak tugas secara simultan (doing lots of things at once)
Mengapa Go Unggul dalam Concurrency?
Goroutines: Lightweight threads yang dikelola oleh Go runtime
Channels: Komunikasi antar goroutines yang aman
Built-in scheduler: Mengelola goroutines secara efisien
Memory footprint kecil: Goroutine hanya ~2KB vs thread OS ~1MB
Konsep Dasar
Sequential: Task1 → Task2 → Task3 → Task4
Concurrent: Task1 ↘
Task2 → ↘
Task3 → ↘ → Result
Task4 → ↗
2. Goroutines - Basic
Apa itu Goroutine?
Goroutine adalah fungsi atau method yang berjalan secara concurrent dengan fungsi lain. Sangat ringan dan efisien.
Hands-On: Goroutine Sederhana
package main
import (
"fmt"
"time"
)
// Fungsi biasa
func sayHello(name string) {
for i := 0; i < 3; i++ {
fmt.Printf("Hello %s! (iteration %d)\n", name, i+1)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
// Sequential execution
fmt.Println("=== Sequential Execution ===")
sayHello("Alice")
sayHello("Bob")
fmt.Println("\n=== Concurrent Execution ===")
// Concurrent execution dengan goroutine
go sayHello("Charlie") // Berjalan di goroutine baru
go sayHello("Diana") // Berjalan di goroutine lain
// Main goroutine harus menunggu
time.Sleep(500 * time.Millisecond)
fmt.Println("Main function ends")
}
Output:
=== Sequential Execution ===
Hello Alice! (iteration 1)
Hello Alice! (iteration 2)
Hello Alice! (iteration 3)
Hello Bob! (iteration 1)
Hello Bob! (iteration 2)
Hello Bob! (iteration 3)
=== Concurrent Execution ===
Hello Charlie! (iteration 1)
Hello Diana! (iteration 1)
Hello Charlie! (iteration 2)
Hello Diana! (iteration 2)
Hello Charlie! (iteration 3)
Hello Diana! (iteration 3)
Main function ends
Hands-On: Anonymous Goroutines
package main
import (
"fmt"
"time"
)
func main() {
// Anonymous goroutine
go func() {
fmt.Println("Anonymous goroutine executing")
}()
// Goroutine dengan parameter
go func(msg string) {
fmt.Println(msg)
}("Hello from anonymous goroutine!")
// Multiple goroutines
for i := 0; i < 5; i++ {
go func(id int) {
fmt.Printf("Goroutine %d executing\n", id)
}(i) // PENTING: Pass i sebagai argument
}
time.Sleep(1 * time.Second)
}
⚠️ Common Pitfall: Closure dan Loop Variables
// ❌ SALAH - Closure trap
for i := 0; i < 5; i++ {
go func() {
fmt.Println(i) // Akan print nilai terakhir (5)
}()
}
// ✅ BENAR - Pass sebagai parameter
for i := 0; i < 5; i++ {
go func(id int) {
fmt.Println(id) // Print nilai yang benar
}(i)
}
3. Channels - Basic
Apa itu Channel?
Channel adalah mekanisme komunikasi antar goroutines. Seperti "pipa" untuk mengirim dan menerima data.
Prinsip: "Don't communicate by sharing memory; share memory by communicating."
Hands-On: Channel Dasar
package main
import "fmt"
func main() {
// Membuat channel
messages := make(chan string)
// Goroutine mengirim data ke channel
go func() {
messages <- "Hello" // Send ke channel
messages <- "World"
}()
// Menerima data dari channel
msg1 := <-messages // Receive dari channel
msg2 := <-messages
fmt.Println(msg1)
fmt.Println(msg2)
}
Hands-On: Channel sebagai Synchronization
package main
import (
"fmt"
"time"
)
func worker(done chan bool) {
fmt.Println("Working...")
time.Sleep(1 * time.Second)
fmt.Println("Done working")
done <- true // Signal bahwa pekerjaan selesai
}
func main() {
done := make(chan bool)
go worker(done)
<-done // Block sampai menerima signal
fmt.Println("Worker finished, exiting main")
}
Channel Operations
package main
import "fmt"
func main() {
// 1. Unbuffered channel
ch := make(chan int)
// 2. Send dan Receive
go func() {
ch <- 42 // Send
}()
val := <-ch // Receive
fmt.Println("Received:", val)
// 3. Channel direction (send-only/receive-only)
// Send-only channel
var sendCh chan<- int = make(chan int)
// Receive-only channel
var recvCh <-chan int = make(chan int)
fmt.Printf("sendCh type: %T\n", sendCh)
fmt.Printf("recvCh type: %T\n", recvCh)
}
Hands-On: Ping-Pong dengan Channels
package main
import (
"fmt"
)
func ping(pings chan<- string, msg string) {
pings <- msg
}
func pong(pings <-chan string, pongs chan<- string) {
msg := <-pings
pongs <- msg
}
func main() {
pings := make(chan string)
pongs := make(chan string)
go ping(pings, "passed message")
go pong(pings, pongs)
fmt.Println(<-pongs)
}
4. Channel Operations
Closing Channels
package main
import "fmt"
func main() {
jobs := make(chan int, 5)
done := make(chan bool)
// Worker goroutine
go func() {
for {
job, more := <-jobs // more = false jika channel closed
if more {
fmt.Println("Received job:", job)
} else {
fmt.Println("All jobs received")
done <- true
return
}
}
}()
// Send jobs
for i := 1; i <= 3; i++ {
jobs <- i
fmt.Println("Sent job:", i)
}
close(jobs) // Close channel setelah selesai send
fmt.Println("Sent all jobs")
<-done
}
Range over Channels
package main
import "fmt"
func main() {
queue := make(chan string, 2)
queue <- "one"
queue <- "two"
close(queue)
// Range akan loop sampai channel closed
for elem := range queue {
fmt.Println(elem)
}
}
Hands-On: Producer-Consumer Pattern
package main
import (
"fmt"
"math/rand"
"time"
)
func producer(ch chan<- int, id int) {
for i := 0; i < 5; i++ {
val := rand.Intn(100)
fmt.Printf("Producer %d: Sending %d\n", id, val)
ch <- val
time.Sleep(time.Millisecond * 100)
}
}
func consumer(ch <-chan int, id int, done chan<- bool) {
for val := range ch {
fmt.Printf("Consumer %d: Received %d\n", id, val)
time.Sleep(time.Millisecond * 200)
}
done <- true
}
func main() {
data := make(chan int, 10)
done := make(chan bool)
// Start 2 producers
go producer(data, 1)
go producer(data, 2)
// Start 1 consumer
go consumer(data, 1, done)
// Wait for producers to finish
time.Sleep(time.Second * 1)
close(data)
// Wait for consumer
<-done
fmt.Println("All done!")
}
5. Select Statement
Apa itu Select?
Select memungkinkan goroutine menunggu multiple channel operations. Seperti switch
tapi untuk channels.
Hands-On: Basic Select
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "one"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "two"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("Received", msg1)
case msg2 := <-ch2:
fmt.Println("Received", msg2)
}
}
}
Hands-On: Select dengan Default
package main
import (
"fmt"
"time"
)
func main() {
messages := make(chan string)
signals := make(chan bool)
// Non-blocking select dengan default
select {
case msg := <-messages:
fmt.Println("Received message:", msg)
default:
fmt.Println("No message received")
}
// Non-blocking send
msg := "hi"
select {
case messages <- msg:
fmt.Println("Sent message:", msg)
default:
fmt.Println("No message sent")
}
// Multi-way non-blocking select
select {
case msg := <-messages:
fmt.Println("Received message:", msg)
case sig := <-signals:
fmt.Println("Received signal:", sig)
default:
fmt.Println("No activity")
}
}
Hands-On: Timeout Pattern
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go func() {
time.Sleep(2 * time.Second)
ch <- "result"
}()
select {
case res := <-ch:
fmt.Println("Received:", res)
case <-time.After(1 * time.Second):
fmt.Println("Timeout! Operation took too long")
}
}
Hands-On: Ticker & Timer
package main
import (
"fmt"
"time"
)
func main() {
// Timer - fire once
timer := time.NewTimer(2 * time.Second)
fmt.Println("Timer started")
<-timer.C
fmt.Println("Timer fired")
// Ticker - fire repeatedly
ticker := time.NewTicker(500 * time.Millisecond)
done := make(chan bool)
go func() {
for {
select {
case <-done:
return
case t := <-ticker.C:
fmt.Println("Tick at", t)
}
}
}()
time.Sleep(2 * time.Second)
ticker.Stop()
done <- true
fmt.Println("Ticker stopped")
}
6. Buffered Channels
Perbedaan Unbuffered vs Buffered
Unbuffered: Send blocks sampai ada yang receive
ch := make(chan int) // capacity 0
Buffered: Send blocks hanya jika buffer penuh
ch := make(chan int, 3) // capacity 3
Hands-On: Buffered Channels
package main
import "fmt"
func main() {
// Buffered channel dengan capacity 2
messages := make(chan string, 2)
// Bisa send 2 values tanpa blocking
messages <- "buffered"
messages <- "channel"
// Receive values
fmt.Println(<-messages)
fmt.Println(<-messages)
// Demo blocking behavior
ch := make(chan int, 2)
ch <- 1
ch <- 2
// ch <- 3 // Ini akan deadlock karena buffer penuh
fmt.Println(<-ch)
ch <- 3 // Sekarang bisa send karena ada space
fmt.Println(<-ch)
fmt.Println(<-ch)
}
Hands-On: Channel Capacity & Length
package main
import "fmt"
func main() {
ch := make(chan int, 5)
fmt.Printf("Initial - Cap: %d, Len: %d\n", cap(ch), len(ch))
ch <- 1
ch <- 2
ch <- 3
fmt.Printf("After sends - Cap: %d, Len: %d\n", cap(ch), len(ch))
<-ch
fmt.Printf("After receive - Cap: %d, Len: %d\n", cap(ch), len(ch))
}
Use Case: Semaphore Pattern
package main
import (
"fmt"
"time"
)
func worker(id int, sem chan struct{}) {
sem <- struct{}{} // Acquire
fmt.Printf("Worker %d: Starting\n", id)
time.Sleep(1 * time.Second)
fmt.Printf("Worker %d: Done\n", id)
<-sem // Release
}
func main() {
// Limit to 3 concurrent workers
maxWorkers := 3
sem := make(chan struct{}, maxWorkers)
for i := 1; i <= 10; i++ {
go worker(i, sem)
}
time.Sleep(5 * time.Second)
}
7. WaitGroups
Apa itu WaitGroup?
WaitGroup digunakan untuk menunggu collection of goroutines selesai.
Hands-On: Basic WaitGroup
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // Decrement counter saat selesai
fmt.Printf("Worker %d: Starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d: Done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1) // Increment counter
go worker(i, &wg)
}
wg.Wait() // Block sampai counter = 0
fmt.Println("All workers completed")
}
Hands-On: WaitGroup dengan Error Handling
package main
import (
"fmt"
"sync"
"time"
)
type Result struct {
ID int
Value int
Error error
}
func worker(id int, wg *sync.WaitGroup, results chan<- Result) {
defer wg.Done()
time.Sleep(time.Millisecond * 100)
// Simulate processing
result := Result{
ID: id,
Value: id * 2,
Error: nil,
}
results <- result
}
func main() {
var wg sync.WaitGroup
numWorkers := 5
results := make(chan Result, numWorkers)
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, &wg, results)
}
// Close results channel setelah semua worker selesai
go func() {
wg.Wait()
close(results)
}()
// Collect results
for result := range results {
fmt.Printf("Worker %d returned: %d\n", result.ID, result.Value)
}
fmt.Println("All workers completed")
}
8. Mutex & RWMutex
Apa itu Mutex?
Mutex (Mutual Exclusion) melindungi shared data dari race conditions.
Race Condition Demo
package main
import (
"fmt"
"sync"
)
func main() {
// ❌ Race condition
counter := 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // NOT thread-safe!
}()
}
wg.Wait()
fmt.Println("Counter (unsafe):", counter) // Tidak selalu 1000!
}
Hands-On: Mutex Solution
package main
import (
"fmt"
"sync"
)
type SafeCounter struct {
mu sync.Mutex
value int
}
func (c *SafeCounter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := SafeCounter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("Counter (safe):", counter.Value()) // Selalu 1000!
}
Hands-On: RWMutex (Read-Write Mutex)
package main
import (
"fmt"
"sync"
"time"
)
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func (c *Cache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
fmt.Printf("Set: %s = %s\n", key, value)
}
func (c *Cache) Get(key string) (string, bool) {
c.mu.RLock() // Read lock - multiple readers OK
defer c.mu.RUnlock()
val, ok := c.data[key]
fmt.Printf("Get: %s = %s\n", key, val)
return val, ok
}
func main() {
cache := Cache{
data: make(map[string]string),
}
var wg sync.WaitGroup
// Writer
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 3; i++ {
cache.Set(fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i))
time.Sleep(100 * time.Millisecond)
}
}()
// Multiple readers
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(50 * time.Millisecond)
cache.Get("key0")
}(i)
}
wg.Wait()
}
Best Practices
// ✅ BENAR - defer unlock
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
// ❌ SALAH - lupa unlock
func (c *Counter) Increment() {
c.mu.Lock()
c.value++
// Deadlock jika return sebelum unlock!
}
// ✅ BENAR - minimize critical section
func (c *Cache) Process(key string) {
data := c.expensiveComputation()
c.mu.Lock()
c.data[key] = data // Lock hanya saat update
c.mu.Unlock()
}
9. Atomic Operations
Apa itu Atomic?
Atomic operations adalah operasi yang tidak bisa di-interrupt. Lebih cepat dari Mutex untuk operasi sederhana.
Hands-On: Atomic Counter
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var counter int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}()
}
wg.Wait()
fmt.Println("Counter:", atomic.LoadInt64(&counter))
}
Hands-On: Atomic Operations Lengkap
package main
import (
"fmt"
"sync/atomic"
)
func main() {
var val int64
// Add
atomic.AddInt64(&val, 10)
fmt.Println("After Add:", atomic.LoadInt64(&val))
// Store
atomic.StoreInt64(&val, 100)
fmt.Println("After Store:", atomic.LoadInt64(&val))
// Swap (exchange)
old := atomic.SwapInt64(&val, 200)
fmt.Println("Old value:", old)
fmt.Println("New value:", atomic.LoadInt64(&val))
// Compare and Swap (CAS)
swapped := atomic.CompareAndSwapInt64(&val, 200, 300)
fmt.Println("Swapped:", swapped)
fmt.Println("Value:", atomic.LoadInt64(&val))
swapped = atomic.CompareAndSwapInt64(&val, 999, 400)
fmt.Println("Swapped:", swapped)
fmt.Println("Value:", atomic.LoadInt64(&val))
}
Hands-On: Atomic Value (Any Type)
package main
import (
"fmt"
"sync/atomic"
"time"
)
type Config struct {
Timeout time.Duration
MaxConn int
}
func main() {
var config atomic.Value
// Store initial config
config.Store(Config{
Timeout: time.Second,
MaxConn: 100,
})
// Read config
go func() {
for i := 0; i < 5; i++ {
cfg := config.Load().(Config)
fmt.Printf("Config: %+v\n", cfg)
time.Sleep(200 * time.Millisecond)
}
}()
// Update config
time.Sleep(500 * time.Millisecond)
config.Store(Config{
Timeout: 2 * time.Second,
MaxConn: 200,
})
time.Sleep(1 * time.Second)
}
Performance Comparison
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
func benchmarkMutex() time.Duration {
var mu sync.Mutex
var counter int64
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 100000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counter++
mu.Unlock()
}()
}
wg.Wait()
return time.Since(start)
}
func benchmarkAtomic() time.Duration {
var counter int64
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 100000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}()
}
wg.Wait()
return time.Since(start)
}
func main() {
mutexTime := benchmarkMutex()
atomicTime := benchmarkAtomic()
fmt.Printf("Mutex: %v\n", mutexTime)
fmt.Printf("Atomic: %v\n", atomicTime)
fmt.Printf("Atomic is %.2fx faster\n", float64(mutexTime)/float64(atomicTime))
}
10. Worker Pool Pattern
Konsep Worker Pool
Worker pool adalah pattern dimana sejumlah goroutines (workers) memproses jobs dari shared queue.
Jobs Queue → [Worker 1]
→ [Worker 2] → Results
→ [Worker 3]
Hands-On: Basic Worker Pool
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Value int
}
type Result struct {
Job Job
Sum int
}
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d: Processing job %d\n", id, job.ID)
time.Sleep(time.Millisecond * 500) // Simulate work
sum := 0
for i := 1; i <= job.Value; i++ {
sum += i
}
results <- Result{
Job: job,
Sum: sum,
}
}
}
func main() {
numWorkers := 3
numJobs := 10
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
var wg sync.WaitGroup
// Start workers
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// Send jobs
for i := 1; i <= numJobs; i++ {
jobs <- Job{ID: i, Value: i * 10}
}
close(jobs)
// Close results channel after all workers done
go func() {
wg.Wait()
close(results)
}()
// Collect results
for result := range results {
fmt.Printf("Job %d: Sum = %d\n", result.Job.ID, result.Sum)
}
}
Hands-On: Advanced Worker Pool dengan Error Handling
package main
import (
"fmt"
"sync"
"time"
)
type Task struct {
ID int
Data string
}
type TaskResult struct {
Task Task
Result string
Error error
}
type WorkerPool struct {
workers int
tasks chan Task // task queue
results chan TaskResult // result queue
wg sync.WaitGroup
}
func NewWorkerPool(workers int) *WorkerPool {
queueSize := 100
return &WorkerPool{
workers: workers,
tasks: make(chan Task, queueSize),
results: make(chan TaskResult, queueSize),
}
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
fmt.Printf("Worker %d: Started\n", id)
// Menggunakan for range adalah cara paling umum dan aman
// untuk memproses item dari sebuah channel sampai channel tersebut ditutup.
for task := range wp.tasks {
// Proses task
result, err := wp.processTask(task, id)
wp.results <- TaskResult{
Task: task,
Result: result,
Error: err,
}
}
// Worker akan otomatis keluar dari loop dan sampai ke baris ini
// ketika channel wp.tasks ditutup dan semua task di dalamnya sudah diproses.
fmt.Printf("Worker %d: Shutting down (tasks channel closed)\n", id)
}
func (wp *WorkerPool) processTask(task Task, workerID int) (string, error) {
// Simulasi kerja
time.Sleep(time.Millisecond * 100)
return fmt.Sprintf("Processed: %s by worker %d", task.Data, workerID), nil
}
func (wp *WorkerPool) Start() {
for i := 1; i <= wp.workers; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
func (wp *WorkerPool) Submit(task Task) {
wp.tasks <- task
}
func (wp *WorkerPool) Shutdown() {
// Cukup tutup channel tasks. Ini akan menjadi sinyal bagi semua worker
// untuk menyelesaikan task yang tersisa dan kemudian berhenti.
close(wp.tasks)
// Tunggu semua worker selesai (setelah mereka memanggil wg.Done())
wp.wg.Wait()
// Setelah semua worker berhenti (dan tidak akan mengirim result lagi),
// kita bisa aman menutup channel results.
close(wp.results)
}
func (wp *WorkerPool) Results() <-chan TaskResult {
return wp.results
}
func main() {
pool := NewWorkerPool(5)
pool.Start()
// Submit tasks
go func() {
// PENTING: Panggil Shutdown di dalam goroutine yang sama
// setelah semua task selesai di-submit.
defer pool.Shutdown()
for i := 1; i <= 20; i++ {
pool.Submit(Task{
ID: i,
Data: fmt.Sprintf("Task-%d", i),
})
}
}()
// Collect results
successCount := 0
errorCount := 0
// Loop ini akan berhenti secara otomatis ketika channel results ditutup oleh Shutdown()
for result := range pool.Results() {
if result.Error != nil {
fmt.Printf("Task %d failed: %v\n", result.Task.ID, result.Error)
errorCount++
} else {
fmt.Printf("Task %d: %s\n", result.Task.ID, result.Result)
successCount++
}
}
fmt.Printf("\nSummary: %d succeeded, %d failed\n", successCount, errorCount)
}
11. Pipeline Pattern
Konsep Pipeline
Pipeline adalah series of stages yang terhubung oleh channels, dimana setiap stage adalah group of goroutines yang menjalankan fungsi yang sama.
Input → [Stage 1] → [Stage 2] → [Stage 3] → Output
Hands-On: Simple Pipeline
package main
import "fmt"
// Stage 1: Generate numbers
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// Stage 2: Square numbers
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// Stage 3: Filter even numbers
func filterEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
if n%2 == 0 {
out <- n
}
}
close(out)
}()
return out
}
func main() {
// Build pipeline
numbers := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squared := square(numbers)
evens := filterEven(squared)
// Consume output
for n := range evens {
fmt.Println(n)
}
}
Hands-On: Data Processing Pipeline
package main
import (
"fmt"
"strings"
"time"
)
type Data struct {
ID int
Value string
}
// Stage 1: Fetch data
func fetchData(ids []int) <-chan Data {
out := make(chan Data)
go func() {
defer close(out)
for _, id := range ids {
time.Sleep(50 * time.Millisecond) // Simulate fetch
out <- Data{
ID: id,
Value: fmt.Sprintf("raw_data_%d", id),
}
}
}()
return out
}
// Stage 2: Transform data
func transform(in <-chan Data) <-chan Data {
out := make(chan Data)
go func() {
defer close(out)
for data := range in {
time.Sleep(30 * time.Millisecond) // Simulate processing
data.Value = strings.ToUpper(data.Value)
out <- data
}
}()
return out
}
// Stage 3: Validate data
func validate(in <-chan Data) <-chan Data {
out := make(chan Data)
go func() {
defer close(out)
for data := range in {
if len(data.Value) > 0 {
out <- data
}
}
}()
return out
}
// Stage 4: Save data
func save(in <-chan Data) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for data := range in {
time.Sleep(20 * time.Millisecond) // Simulate save
out <- fmt.Sprintf("Saved: ID=%d, Value=%s", data.ID, data.Value)
}
}()
return out
}
func main() {
start := time.Now()
// Build pipeline
ids := []int{1, 2, 3, 4, 5}
pipeline := save(validate(transform(fetchData(ids))))
// Consume results
for result := range pipeline {
fmt.Println(result)
}
fmt.Printf("\nTotal time: %v\n", time.Since(start))
}
Hands-On: Pipeline dengan Cancellation
package main
import (
"fmt"
"time"
)
func generator(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-done:
fmt.Println("Generator: Cancelled")
return
}
}
}()
return out
}
func square(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
fmt.Println("Square: Cancelled")
return
}
}
}()
return out
}
func main() {
done := make(chan struct{})
numbers := generator(done, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squared := square(done, numbers)
// Cancel after processing 3 items
count := 0
for n := range squared {
fmt.Println(n)
count++
if count == 3 {
close(done)
break
}
}
time.Sleep(100 * time.Millisecond)
fmt.Println("Pipeline cancelled")
}
12. Fan-Out Fan-In Pattern
Konsep Fan-Out Fan-In
Fan-Out: Multiple goroutines membaca dari channel yang sama Fan-In: Multiple goroutines menulis ke channel yang sama
→ [Worker 1] →
Input → → [Worker 2] → → Output (merge)
→ [Worker 3] →
Hands-On: Fan-Out Fan-In
package main
import (
"fmt"
"sync"
"time"
)
// Generate numbers
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// Fan-out: Multiple workers process from same input
func worker(id int, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
fmt.Printf("Worker %d processing %d\n", id, n)
time.Sleep(time.Millisecond * 100)
out <- n * n
}
}()
return out
}
// Fan-in: Merge multiple channels into one
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start goroutine for each input channel
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
// Close out after all inputs are done
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
start := time.Now()
// Generate input
input := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Fan-out to 3 workers
worker1 := worker(1, input)
worker2 := worker(2, input)
worker3 := worker(3, input)
// Fan-in results
results := merge(worker1, worker2, worker3)
// Collect all results
for result := range results {
fmt.Println("Result:", result)
}
fmt.Printf("\nTotal time: %v\n", time.Since(start))
}
Hands-On: Advanced Fan-Out dengan Load Balancing
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Duration time.Duration
}
type Result struct {
JobID int
WorkerID int
Output string
}
func jobGenerator(jobs []Job) <-chan Job {
out := make(chan Job)
go func() {
defer close(out)
for _, job := range jobs {
out <- job
}
}()
return out
}
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d: Started job %d\n", id, job.ID)
time.Sleep(job.Duration)
results <- Result{
JobID: job.ID,
WorkerID: id,
Output: fmt.Sprintf("Job %d completed", job.ID),
}
fmt.Printf("Worker %d: Finished job %d\n", id, job.ID)
}
}
func main() {
// Create jobs with varying durations
jobs := []Job{
{ID: 1, Duration: 2 * time.Second},
{ID: 2, Duration: 1 * time.Second},
{ID: 3, Duration: 3 * time.Second},
{ID: 4, Duration: 1 * time.Second},
{ID: 5, Duration: 2 * time.Second},
{ID: 6, Duration: 1 * time.Second},
}
start := time.Now()
numWorkers := 3
jobChan := jobGenerator(jobs)
results := make(chan Result, len(jobs))
var wg sync.WaitGroup
// Fan-out: Start workers
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobChan, results, &wg)
}
// Close results when all workers done
go func() {
wg.Wait()
close(results)
}()
// Collect results
for result := range results {
fmt.Printf("Result: %s (by Worker %d)\n", result.Output, result.WorkerID)
}
fmt.Printf("\nTotal time: %v\n", time.Since(start))
fmt.Println("Without concurrency would take:", 10*time.Second)
}
13. Context Package
Apa itu Context?
Context digunakan untuk:
Cancellation signals
Deadlines & timeouts
Request-scoped values
Hands-On: Context dengan Cancellation
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d: Cancelled (%v)\n", id, ctx.Err())
return
default:
fmt.Printf("Worker %d: Working...\n", id)
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
// Create cancellable context
ctx, cancel := context.WithCancel(context.Background())
// Start workers
for i := 1; i <= 3; i++ {
go worker(ctx, i)
}
// Let them work for 2 seconds
time.Sleep(2 * time.Second)
// Cancel all workers
fmt.Println("Main: Cancelling workers...")
cancel()
time.Sleep(1 * time.Second)
fmt.Println("Main: Done")
}
Hands-On: Context dengan Timeout
package main
import (
"context"
"fmt"
"time"
)
func slowOperation(ctx context.Context) error {
done := make(chan struct{})
go func() {
// Simulate slow operation
time.Sleep(3 * time.Second)
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func main() {
// Context dengan timeout 2 detik
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
fmt.Println("Starting operation...")
err := slowOperation(ctx)
if err != nil {
fmt.Println("Operation failed:", err)
} else {
fmt.Println("Operation succeeded")
}
}
Hands-On: Context dengan Deadline
package main
import (
"context"
"fmt"
"time"
)
func processData(ctx context.Context, data string) {
deadline, ok := ctx.Deadline()
if ok {
fmt.Printf("Deadline: %v\n", deadline)
}
for i := 1; i <= 5; i++ {
select {
case <-ctx.Done():
fmt.Println("Processing cancelled:", ctx.Err())
return
default:
fmt.Printf("Processing step %d\n", i)
time.Sleep(500 * time.Millisecond)
}
}
fmt.Println("Processing complete")
}
func main() {
// Set deadline 2 detik dari sekarang
deadline := time.Now().Add(2 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
go processData(ctx, "important data")
time.Sleep(3 * time.Second)
}
Hands-On: Context Values
package main
import (
"context"
"fmt"
)
type contextKey string
func processRequest(ctx context.Context) {
// Extract values from context
if userID, ok := ctx.Value(contextKey("userID")).(string); ok {
fmt.Println("User ID:", userID)
}
if requestID, ok := ctx.Value(contextKey("requestID")).(string); ok {
fmt.Println("Request ID:", requestID)
}
// Pass context to other functions
doWork(ctx)
}
func doWork(ctx context.Context) {
if userID, ok := ctx.Value(contextKey("userID")).(string); ok {
fmt.Println("Doing work for user:", userID)
}
}
func main() {
// Create context with values
ctx := context.Background()
ctx = context.WithValue(ctx, contextKey("userID"), "user123")
ctx = context.WithValue(ctx, contextKey("requestID"), "req456")
processRequest(ctx)
}
Hands-On: Real-World HTTP Request dengan Context
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
type Response struct {
Data string
Err error
}
func fetchData(ctx context.Context, url string) Response {
resultCh := make(chan Response, 1)
go func() {
// Simulate HTTP request
time.Sleep(time.Duration(rand.Intn(3000)) * time.Millisecond)
resultCh <- Response{
Data: fmt.Sprintf("Data from %s", url),
Err: nil,
}
}()
select {
case result := <-resultCh:
return result
case <-ctx.Done():
return Response{
Data: "",
Err: ctx.Err(),
}
}
}
func main() {
urls := []string{
"https://api1.example.com",
"https://api2.example.com",
"https://api3.example.com",
}
// Set timeout untuk semua requests
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
results := make(chan Response, len(urls))
for _, url := range urls {
go func(u string) {
results <- fetchData(ctx, u)
}(url)
}
for i := 0; i < len(urls); i++ {
result := <-results
if result.Err != nil {
fmt.Printf("Error: %v\n", result.Err)
} else {
fmt.Printf("Success: %s\n", result.Data)
}
}
}
14. Rate Limiting
Konsep Rate Limiting
Rate limiting membatasi jumlah operasi per unit waktu.
Hands-On: Simple Rate Limiter dengan Ticker
package main
import (
"fmt"
"time"
)
func main() {
// Allow 1 request per 200ms
limiter := time.Tick(200 * time.Millisecond)
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
for req := range requests {
<-limiter // Wait for rate limiter
fmt.Println("Request", req, time.Now())
}
}
Hands-On: Bursty Rate Limiter
package main
import (
"fmt"
"time"
)
func main() {
// Allow bursts of 3 requests
burstyLimiter := make(chan time.Time, 3)
// Fill bucket initially
for i := 0; i < 3; i++ {
burstyLimiter <- time.Now()
}
// Refill 1 token every 200ms
go func() {
for t := range time.Tick(200 * time.Millisecond) {
burstyLimiter <- t
}
}()
// Simulate 10 requests
requests := make(chan int, 10)
for i := 1; i <= 10; i++ {
requests <- i
}
close(requests)
for req := range requests {
<-burstyLimiter
fmt.Println("Request", req, time.Now())
}
}
Hands-On: Token Bucket Rate Limiter
package main
import (
"fmt"
"sync"
"time"
)
type RateLimiter struct {
tokens int
maxTokens int
refillRate time.Duration
mu sync.Mutex
}
func NewRateLimiter(maxTokens int, refillRate time.Duration) *RateLimiter {
rl := &RateLimiter{
tokens: maxTokens,
maxTokens: maxTokens,
refillRate: refillRate,
}
// Start refill goroutine
go rl.refill()
return rl
}
func (rl *RateLimiter) refill() {
ticker := time.NewTicker(rl.refillRate)
defer ticker.Stop()
for range ticker.C {
rl.mu.Lock()
if rl.tokens < rl.maxTokens {
rl.tokens++
}
rl.mu.Unlock()
}
}
func (rl *RateLimiter) Allow() bool {
rl.mu.Lock()
defer rl.mu.Unlock()
if rl.tokens > 0 {
rl.tokens--
return true
}
return false
}
func (rl *RateLimiter) Wait() {
for !rl.Allow() {
time.Sleep(10 * time.Millisecond)
}
}
func main() {
limiter := NewRateLimiter(5, 500*time.Millisecond)
for i := 1; i <= 10; i++ {
limiter.Wait()
fmt.Printf("Request %d at %v\n", i, time.Now())
}
}
Hands-On: Rate Limiter dengan golang.org/x/time/rate
package main
import (
"context"
"fmt"
"time"
// Uncomment jika package tersedia:
// "golang.org/x/time/rate"
)
// Simulasi implementasi sederhana
type Limiter struct {
rate int
burst int
tokens chan struct{}
}
func NewLimiter(rate, burst int) *Limiter {
l := &Limiter{
rate: rate,
burst: burst,
tokens: make(chan struct{}, burst),
}
// Fill initial tokens
for i := 0; i < burst; i++ {
l.tokens <- struct{}{}
}
// Refill tokens
go func() {
ticker := time.NewTicker(time.Second / time.Duration(rate))
defer ticker.Stop()
for range ticker.C {
select {
case l.tokens <- struct{}{}:
default:
}
}
}()
return l
}
func (l *Limiter) Wait(ctx context.Context) error {
select {
case <-l.tokens:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func main() {
// Allow 2 requests per second, burst of 5
limiter := NewLimiter(2, 5)
for i := 1; i <= 10; i++ {
ctx := context.Background()
if err := limiter.Wait(ctx); err != nil {
fmt.Printf("Request %d: Rate limited\n", i)
continue
}
fmt.Printf("Request %d: Allowed at %v\n", i, time.Now())
}
}
15. Error Handling dalam Concurrency
Challenge dalam Error Handling
Goroutines tidak bisa return error secara langsung ke caller.
Hands-On: Error Handling dengan Channels
package main
import (
"errors"
"fmt"
"math/rand"
)
type Result struct {
Value int
Error error
}
func worker(id int, results chan<- Result) {
// Simulate work that might fail
if rand.Intn(10) < 3 {
results <- Result{
Value: 0,
Error: errors.New(fmt.Sprintf("worker %d failed", id)),
}
return
}
results <- Result{
Value: id * 2,
Error: nil,
}
}
func main() {
numWorkers := 10
results := make(chan Result, numWorkers)
for i := 1; i <= numWorkers; i++ {
go worker(i, results)
}
successCount := 0
errorCount := 0
for i := 0; i < numWorkers; i++ {
result := <-results
if result.Error != nil {
fmt.Printf("Error: %v\n", result.Error)
errorCount++
} else {
fmt.Printf("Success: %d\n", result.Value)
successCount++
}
}
fmt.Printf("\nTotal: %d success, %d errors\n", successCount, errorCount)
}
Hands-On: errgroup Package Pattern
package main
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"
)
// Simulasi errgroup
type Group struct {
wg sync.WaitGroup
errMu sync.Mutex
err error
cancel context.CancelFunc
}
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
g.errMu.Lock()
if g.err == nil {
g.err = err
if g.cancel != nil {
g.cancel()
}
}
g.errMu.Unlock()
}
}()
}
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
func fetchData(ctx context.Context, id int) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Duration(rand.Intn(500)) * time.Millisecond):
if rand.Intn(10) < 3 {
return errors.New(fmt.Sprintf("fetch %d failed", id))
}
fmt.Printf("Fetch %d succeeded\n", id)
return nil
}
}
func main() {
g, ctx := WithContext(context.Background())
for i := 1; i <= 10; i++ {
id := i
g.Go(func() error {
return fetchData(ctx, id)
})
}
if err := g.Wait(); err != nil {
fmt.Println("\nFirst error:", err)
} else {
fmt.Println("\nAll operations succeeded")
}
}
Hands-On: Panic Recovery dalam Goroutines
package main
import (
"fmt"
"time"
)
func riskyWorker(id int) {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Worker %d recovered from panic: %v\n", id, r)
}
}()
fmt.Printf("Worker %d starting\n", id)
if id%2 == 0 {
panic(fmt.Sprintf("Worker %d panicked!", id))
}
time.Sleep(100 * time.Millisecond)
fmt.Printf("Worker %d completed\n", id)
}
func main() {
for i := 1; i <= 5; i++ {
go riskyWorker(i)
}
time.Sleep(1 * time.Second)
fmt.Println("Main function continues despite panics")
}
Hands-On: Timeout Pattern dengan Error
package main
import (
"context"
"errors"
"fmt"
"time"
)
func slowOperation(ctx context.Context) (string, error) {
resultCh := make(chan string)
errCh := make(chan error)
go func() {
time.Sleep(2 * time.Second)
resultCh <- "operation completed"
}()
select {
case result := <-resultCh:
return result, nil
case err := <-errCh:
return "", err
case <-ctx.Done():
return "", errors.New("operation timeout")
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
result, err := slowOperation(ctx)
if err != nil {
fmt.Println("Error:", err)
} else {
fmt.Println("Result:", result)
}
}
16. Advanced Patterns
16.1 Semaphore Pattern
package main
import (
"fmt"
"sync"
"time"
)
type Semaphore struct {
sem chan struct{}
}
func NewSemaphore(max int) *Semaphore {
return &Semaphore{
sem: make(chan struct{}, max),
}
}
func (s *Semaphore) Acquire() {
s.sem <- struct{}{}
}
func (s *Semaphore) Release() {
<-s.sem
}
func worker(id int, sem *Semaphore, wg *sync.WaitGroup) {
defer wg.Done()
sem.Acquire()
defer sem.Release()
fmt.Printf("Worker %d: Started\n", id)
time.Sleep(1 * time.Second)
fmt.Printf("Worker %d: Finished\n", id)
}
func main() {
maxConcurrent := 3
sem := NewSemaphore(maxConcurrent)
var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1)
go worker(i, sem, &wg)
}
wg.Wait()
fmt.Println("All workers completed")
}
16.2 Future/Promise Pattern
package main
import (
"errors"
"fmt"
"time"
)
type Future struct {
result chan interface{}
err chan error
}
func NewFuture(f func() (interface{}, error)) *Future {
future := &Future{
result: make(chan interface{}, 1),
err: make(chan error, 1),
}
go func() {
result, err := f()
if err != nil {
future.err <- err
} else {
future.result <- result
}
}()
return future
}
func (f *Future) Get() (interface{}, error) {
select {
case result := <-f.result:
return result, nil
case err := <-f.err:
return nil, err
}
}
func (f *Future) GetWithTimeout(timeout time.Duration) (interface{}, error) {
select {
case result := <-f.result:
return result, nil
case err := <-f.err:
return nil, err
case <-time.After(timeout):
return nil, errors.New("timeout")
}
}
func expensiveOperation() (interface{}, error) {
time.Sleep(2 * time.Second)
return "Computed result", nil
}
func main() {
fmt.Println("Starting future...")
future := NewFuture(expensiveOperation)
fmt.Println("Doing other work...")
time.Sleep(1 * time.Second)
fmt.Println("Getting future result...")
result, err := future.GetWithTimeout(3 * time.Second)
if err != nil {
fmt.Println("Error:", err)
} else {
fmt.Println("Result:", result)
}
}
16.3 Bounded Parallelism
package main
import (
"fmt"
"sync"
"time"
)
func boundedParallel(tasks []func() error, maxConcurrent int) []error {
var wg sync.WaitGroup
sem := make(chan struct{}, maxConcurrent)
errors := make([]error, len(tasks))
for i, task := range tasks {
wg.Add(1)
go func(idx int, t func() error) {
defer wg.Done()
sem <- struct{}{} // Acquire
defer func() { <-sem }() // Release
errors[idx] = t()
}(i, task)
}
wg.Wait()
return errors
}
func main() {
tasks := []func() error{
func() error {
fmt.Println("Task 1")
time.Sleep(1 * time.Second)
return nil
},
func() error {
fmt.Println("Task 2")
time.Sleep(1 * time.Second)
return nil
},
func() error {
fmt.Println("Task 3")
time.Sleep(1 * time.Second)
return nil
},
func() error {
fmt.Println("Task 4")
time.Sleep(1 * time.Second)
return nil
},
func() error {
fmt.Println("Task 5")
time.Sleep(1 * time.Second)
return nil
},
}
start := time.Now()
errors := boundedParallel(tasks, 2) // Max 2 concurrent
duration := time.Since(start)
fmt.Printf("\nCompleted in %v\n", duration)
fmt.Printf("Errors: %v\n", errors)
}
16.4 Or-Channel Pattern
package main
import (
"fmt"
"time"
)
func or(channels ...<-chan interface{}) <-chan interface{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], orDone)...):
}
}
}()
return orDone
}
func sig(after time.Duration) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
time.Sleep(after)
}()
return c
}
func main() {
start := time.Now()
<-or(
sig(2*time.Hour),
sig(5*time.Minute),
sig(1*time.Second),
sig(1*time.Hour),
sig(1*time.Minute),
)
fmt.Printf("Done after %v\n", time.Since(start))
}
16.5 Tee-Channel Pattern
package main
import "fmt"
func tee(in <-chan int) (<-chan int, <-chan int) {
out1 := make(chan int)
out2 := make(chan int)
go func() {
defer close(out1)
defer close(out2)
for val := range in {
out1, out2 := out1, out2 // Shadow variables
for i := 0; i < 2; i++ {
select {
case out1 <- val:
out1 = nil
case out2 <- val:
out2 = nil
}
}
}
}()
return out1, out2
}
func main() {
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 5; i++ {
input <- i
}
}()
out1, out2 := tee(input)
for val := range out1 {
fmt.Printf("Out1: %d, Out2: %d\n", val, <-out2)
}
}
16.6 Bridge-Channel Pattern
package main
import (
"fmt"
)
func bridge(chanStream <-chan <-chan int) <-chan int {
valStream := make(chan int)
go func() {
defer close(valStream)
for ch := range chanStream {
for val := range ch {
valStream <- val
}
}
}()
return valStream
}
func main() {
chanStream := make(chan (<-chan int))
go func() {
defer close(chanStream)
for i := 0; i < 3; i++ {
ch := make(chan int)
chanStream <- ch
go func(ch chan int) {
defer close(ch)
for j := 1; j <= 3; j++ {
ch <- j
}
}(ch)
}
}()
for val := range bridge(chanStream) {
fmt.Printf("%d ", val)
}
fmt.Println()
}
16.7 Or-Done Channel Pattern
package main
import (
"fmt"
"time"
)
func orDone(done <-chan struct{}, c <-chan int) <-chan int {
valStream := make(chan int)
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if !ok {
return
}
select {
case valStream <- v:
case <-done:
return
}
}
}
}()
return valStream
}
func main() {
done := make(chan struct{})
defer close(done)
input := make(chan int)
go func() {
defer close(input)
for i := 1; i <= 10; i++ {
input <- i
time.Sleep(100 * time.Millisecond)
}
}()
// Cancel after 500ms
go func() {
time.Sleep(500 * time.Millisecond)
close(done)
}()
for val := range orDone(done, input) {
fmt.Println(val)
}
}
16.8 Replicated Requests Pattern
package main
import (
"fmt"
"math/rand"
"time"
)
func doWork(id int) <-chan string {
result := make(chan string, 1)
go func() {
// Simulate varying response times
delay := time.Duration(rand.Intn(500)) * time.Millisecond
time.Sleep(delay)
result <- fmt.Sprintf("Result from worker %d (took %v)", id, delay)
}()
return result
}
func firstResponse(workers int) string {
channels := make([]<-chan string, workers)
for i := 0; i < workers; i++ {
channels[i] = doWork(i + 1)
}
// Return first response
select {
case result := <-channels[0]:
return result
case result := <-channels[1]:
return result
case result := <-channels[2]:
return result
}
}
func main() {
for i := 0; i < 5; i++ {
start := time.Now()
result := firstResponse(3)
fmt.Printf("Attempt %d: %s (total: %v)\n", i+1, result, time.Since(start))
time.Sleep(100 * time.Millisecond)
}
}
16.9 Queuing Pattern
package main
import (
"fmt"
"sync"
"time"
)
type Queue struct {
items []interface{}
mu sync.Mutex
cond *sync.Cond
}
func NewQueue() *Queue {
q := &Queue{
items: make([]interface{}, 0),
}
q.cond = sync.NewCond(&q.mu)
return q
}
func (q *Queue) Enqueue(item interface{}) {
q.mu.Lock()
defer q.mu.Unlock()
q.items = append(q.items, item)
q.cond.Signal() // Wake up one waiting goroutine
}
func (q *Queue) Dequeue() interface{} {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.items) == 0 {
q.cond.Wait() // Wait until items available
}
item := q.items[0]
q.items = q.items[1:]
return item
}
func (q *Queue) Len() int {
q.mu.Lock()
defer q.mu.Unlock()
return len(q.items)
}
func main() {
queue := NewQueue()
// Producer
go func() {
for i := 1; i <= 10; i++ {
queue.Enqueue(i)
fmt.Printf("Enqueued: %d\n", i)
time.Sleep(100 * time.Millisecond)
}
}()
// Consumer
for i := 0; i < 10; i++ {
item := queue.Dequeue()
fmt.Printf("Dequeued: %d\n", item)
time.Sleep(200 * time.Millisecond)
}
}
16.10 Heartbeat Pattern
package main
import (
"fmt"
"time"
)
func doWork(done <-chan struct{}) (<-chan int, <-chan struct{}) {
heartbeat := make(chan struct{})
results := make(chan int)
go func() {
defer close(heartbeat)
defer close(results)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for i := 0; i < 10; i++ {
select {
case <-done:
return
case <-ticker.C:
heartbeat <- struct{}{}
case results <- i:
fmt.Printf("Sent result: %d\n", i)
}
time.Sleep(300 * time.Millisecond)
}
}()
return results, heartbeat
}
func main() {
done := make(chan struct{})
defer close(done)
results, heartbeat := doWork(done)
timeout := time.After(10 * time.Second)
for {
select {
case result, ok := <-results:
if !ok {
fmt.Println("Worker completed")
return
}
fmt.Printf("Received result: %d\n", result)
case <-heartbeat:
fmt.Println("💓 Heartbeat received")
case <-timeout:
fmt.Println("Worker timed out")
return
}
}
}
Best Practices & Common Pitfalls
✅ Best Practices
Always close channels from sender side
// ✅ BENAR
go func() {
defer close(ch)
for _, val := range data {
ch <- val
}
}()
Use defer untuk unlock mutex
// ✅ BENAR
func (c *Counter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}
Pass channels sebagai parameter, bukan variable
// ✅ BENAR
func worker(jobs <-chan Job, results chan<- Result)
// ❌ SALAH
var globalJobs chan Job
Gunakan context untuk cancellation
// ✅ BENAR
func process(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-work:
// process
}
}
Buffer channels ketika producer/consumer independent
// ✅ BENAR untuk fire-and-forget
results := make(chan Result, 100)
❌ Common Pitfalls
Goroutine Leak
// ❌ SALAH - goroutine never exits
go func() {
for {
// No exit condition!
}
}()
// ✅ BENAR
go func() {
for {
select {
case <-done:
return
case work := <-jobs:
// process
}
}
}()
Closing channel multiple times
// ❌ SALAH - panic!
close(ch)
close(ch)
// ✅ BENAR - close once
sync.Once{}.Do(func() {
close(ch)
})
Sending to closed channel
// ❌ SALAH - panic!
close(ch)
ch <- value
// ✅ BENAR - check if closed
select {
case ch <- value:
default:
// channel closed or full
}
Data race
// ❌ SALAH
counter := 0
for i := 0; i < 10; i++ {
go func() {
counter++ // Race!
}()
}
// ✅ BENAR - use mutex or atomic
var mu sync.Mutex
for i := 0; i < 10; i++ {
go func() {
mu.Lock()
counter++
mu.Unlock()
}()
}
Forgetting WaitGroup.Done()
// ❌ SALAH - deadlock!
var wg sync.WaitGroup
wg.Add(1)
go func() {
// Forgot wg.Done()
}()
wg.Wait()
// ✅ BENAR
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// work
}()
wg.Wait()
Performance Tips
1. Profile Concurrent Code
# CPU profiling
go test -cpuprofile=cpu.prof -bench=.
go tool pprof cpu.prof
# Race detection
go test -race
go run -race main.go
2. Right-size Worker Pools
// Rule of thumb:
// CPU-bound: numWorkers = runtime.NumCPU()
// I/O-bound: numWorkers = much higher (100-1000s)
numWorkers := runtime.NumCPU()
3. Use Buffered Channels Wisely
// Unbuffered: synchronization points
ch := make(chan int)
// Buffered: reduce blocking
ch := make(chan int, 100)
4. Prefer Atomic for Simple Counters
// Faster than mutex for simple operations
atomic.AddInt64(&counter, 1)
5. Context Overhead
// Context has overhead - don't overuse
// Use for:
// - Cancellation
// - Timeouts
// - Request-scoped values
// Don't use for:
// - Passing regular function parameters
Testing Concurrent Code
Test dengan Race Detector
package main
import (
"sync"
"testing"
)
func TestCounter(t *testing.T) {
var counter int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // Will be caught by -race
}()
}
wg.Wait()
}
// Run: go test -race
Test dengan Timeout
func TestWithTimeout(t *testing.T) {
done := make(chan bool)
go func() {
// Your concurrent code
time.Sleep(100 * time.Millisecond)
done <- true
}()
select {
case <-done:
// Success
case <-time.After(1 * time.Second):
t.Fatal("Test timeout")
}
}
Resources & Further Learning
Official Documentation
Go Concurrency Patterns: https://go.dev/blog/pipelines
Effective Go: https://go.dev/doc/effective_go#concurrency
Go Memory Model: https://go.dev/ref/mem
Books
"Concurrency in Go" by Katherine Cox-Buday
"The Go Programming Language" by Donovan & Kernighan
Tools
go test -race
: Race detectorgo tool trace
: Execution tracerpprof
: Performance profiler
Summary
Key Takeaways
Goroutines: Lightweight, efficient concurrency primitives
Channels: Safe communication between goroutines
Select: Multiplexing channel operations
Sync Package: Low-level synchronization (Mutex, WaitGroup, etc.)
Context: Cancellation, timeouts, and request-scoped values
Patterns: Worker pools, pipelines, fan-out/fan-in
When to Use What
Share memory
Mutex, RWMutex
Simple counter
Atomic operations
Communicate data
Channels
Wait for goroutines
WaitGroup
Limit concurrency
Semaphore, Buffered channel
Cancellation
Context
Timeout
Context.WithTimeout, time.After
Remember
"Don't communicate by sharing memory; share memory by communicating."
Selamat belajar concurrency di Go!
Last updated