Concurrency di Go Dari Basic hingga Advanced

Daftar Isi


1. Pengenalan Concurrency

Apa itu Concurrency?

Concurrency adalah kemampuan program untuk menangani beberapa tugas secara bersamaan (concurrent), bukan paralel. Perbedaannya:

  • Concurrency: Menangani banyak tugas dalam periode waktu yang sama (dealing with lots of things at once)

  • Parallelism: Menjalankan banyak tugas secara simultan (doing lots of things at once)

Mengapa Go Unggul dalam Concurrency?

  1. Goroutines: Lightweight threads yang dikelola oleh Go runtime

  2. Channels: Komunikasi antar goroutines yang aman

  3. Built-in scheduler: Mengelola goroutines secara efisien

  4. Memory footprint kecil: Goroutine hanya ~2KB vs thread OS ~1MB

Konsep Dasar

Sequential:     Task1 → Task2 → Task3 → Task4
Concurrent:     Task1 ↘
                Task2 → ↘
                Task3 →   ↘ → Result
                Task4 →   ↗

2. Goroutines - Basic

Apa itu Goroutine?

Goroutine adalah fungsi atau method yang berjalan secara concurrent dengan fungsi lain. Sangat ringan dan efisien.

Hands-On: Goroutine Sederhana

package main

import (
    "fmt"
    "time"
)

// Fungsi biasa
func sayHello(name string) {
    for i := 0; i < 3; i++ {
        fmt.Printf("Hello %s! (iteration %d)\n", name, i+1)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    // Sequential execution
    fmt.Println("=== Sequential Execution ===")
    sayHello("Alice")
    sayHello("Bob")
    
    fmt.Println("\n=== Concurrent Execution ===")
    // Concurrent execution dengan goroutine
    go sayHello("Charlie")  // Berjalan di goroutine baru
    go sayHello("Diana")    // Berjalan di goroutine lain
    
    // Main goroutine harus menunggu
    time.Sleep(500 * time.Millisecond)
    fmt.Println("Main function ends")
}

Output:

=== Sequential Execution ===
Hello Alice! (iteration 1)
Hello Alice! (iteration 2)
Hello Alice! (iteration 3)
Hello Bob! (iteration 1)
Hello Bob! (iteration 2)
Hello Bob! (iteration 3)

=== Concurrent Execution ===
Hello Charlie! (iteration 1)
Hello Diana! (iteration 1)
Hello Charlie! (iteration 2)
Hello Diana! (iteration 2)
Hello Charlie! (iteration 3)
Hello Diana! (iteration 3)
Main function ends

Hands-On: Anonymous Goroutines

package main

import (
    "fmt"
    "time"
)

func main() {
    // Anonymous goroutine
    go func() {
        fmt.Println("Anonymous goroutine executing")
    }()
    
    // Goroutine dengan parameter
    go func(msg string) {
        fmt.Println(msg)
    }("Hello from anonymous goroutine!")
    
    // Multiple goroutines
    for i := 0; i < 5; i++ {
        go func(id int) {
            fmt.Printf("Goroutine %d executing\n", id)
        }(i) // PENTING: Pass i sebagai argument
    }
    
    time.Sleep(1 * time.Second)
}

⚠️ Common Pitfall: Closure dan Loop Variables

// ❌ SALAH - Closure trap
for i := 0; i < 5; i++ {
    go func() {
        fmt.Println(i) // Akan print nilai terakhir (5)
    }()
}

// ✅ BENAR - Pass sebagai parameter
for i := 0; i < 5; i++ {
    go func(id int) {
        fmt.Println(id) // Print nilai yang benar
    }(i)
}

3. Channels - Basic

Apa itu Channel?

Channel adalah mekanisme komunikasi antar goroutines. Seperti "pipa" untuk mengirim dan menerima data.

Prinsip: "Don't communicate by sharing memory; share memory by communicating."

Hands-On: Channel Dasar

package main

import "fmt"

func main() {
    // Membuat channel
    messages := make(chan string)
    
    // Goroutine mengirim data ke channel
    go func() {
        messages <- "Hello"  // Send ke channel
        messages <- "World"
    }()
    
    // Menerima data dari channel
    msg1 := <-messages  // Receive dari channel
    msg2 := <-messages
    
    fmt.Println(msg1)
    fmt.Println(msg2)
}

Hands-On: Channel sebagai Synchronization

package main

import (
    "fmt"
    "time"
)

func worker(done chan bool) {
    fmt.Println("Working...")
    time.Sleep(1 * time.Second)
    fmt.Println("Done working")
    
    done <- true  // Signal bahwa pekerjaan selesai
}

func main() {
    done := make(chan bool)
    
    go worker(done)
    
    <-done  // Block sampai menerima signal
    fmt.Println("Worker finished, exiting main")
}

Channel Operations

package main

import "fmt"

func main() {
    // 1. Unbuffered channel
    ch := make(chan int)
    
    // 2. Send dan Receive
    go func() {
        ch <- 42  // Send
    }()
    val := <-ch   // Receive
    fmt.Println("Received:", val)
    
    // 3. Channel direction (send-only/receive-only)
    // Send-only channel
    var sendCh chan<- int = make(chan int)
    
    // Receive-only channel
    var recvCh <-chan int = make(chan int)
    
    fmt.Printf("sendCh type: %T\n", sendCh)
    fmt.Printf("recvCh type: %T\n", recvCh)
}

Hands-On: Ping-Pong dengan Channels

package main

import (
	"fmt"
)

func ping(pings chan<- string, msg string) {
	pings <- msg
}

func pong(pings <-chan string, pongs chan<- string) {
	msg := <-pings
	pongs <- msg
}

func main() {
	pings := make(chan string)
	pongs := make(chan string)

	go ping(pings, "passed message")
	go pong(pings, pongs)

	fmt.Println(<-pongs)
}

4. Channel Operations

Closing Channels

package main

import "fmt"

func main() {
    jobs := make(chan int, 5)
    done := make(chan bool)
    
    // Worker goroutine
    go func() {
        for {
            job, more := <-jobs  // more = false jika channel closed
            if more {
                fmt.Println("Received job:", job)
            } else {
                fmt.Println("All jobs received")
                done <- true
                return
            }
        }
    }()
    
    // Send jobs
    for i := 1; i <= 3; i++ {
        jobs <- i
        fmt.Println("Sent job:", i)
    }
    
    close(jobs)  // Close channel setelah selesai send
    fmt.Println("Sent all jobs")
    
    <-done
}

Range over Channels

package main

import "fmt"

func main() {
    queue := make(chan string, 2)
    
    queue <- "one"
    queue <- "two"
    close(queue)
    
    // Range akan loop sampai channel closed
    for elem := range queue {
        fmt.Println(elem)
    }
}

Hands-On: Producer-Consumer Pattern

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func producer(ch chan<- int, id int) {
    for i := 0; i < 5; i++ {
        val := rand.Intn(100)
        fmt.Printf("Producer %d: Sending %d\n", id, val)
        ch <- val
        time.Sleep(time.Millisecond * 100)
    }
}

func consumer(ch <-chan int, id int, done chan<- bool) {
    for val := range ch {
        fmt.Printf("Consumer %d: Received %d\n", id, val)
        time.Sleep(time.Millisecond * 200)
    }
    done <- true
}

func main() {
    data := make(chan int, 10)
    done := make(chan bool)
    
    // Start 2 producers
    go producer(data, 1)
    go producer(data, 2)
    
    // Start 1 consumer
    go consumer(data, 1, done)
    
    // Wait for producers to finish
    time.Sleep(time.Second * 1)
    close(data)
    
    // Wait for consumer
    <-done
    fmt.Println("All done!")
}

5. Select Statement

Apa itu Select?

Select memungkinkan goroutine menunggu multiple channel operations. Seperti switch tapi untuk channels.

Hands-On: Basic Select

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "one"
    }()
    
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "two"
    }()
    
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("Received", msg1)
        case msg2 := <-ch2:
            fmt.Println("Received", msg2)
        }
    }
}

Hands-On: Select dengan Default

package main

import (
    "fmt"
    "time"
)

func main() {
    messages := make(chan string)
    signals := make(chan bool)
    
    // Non-blocking select dengan default
    select {
    case msg := <-messages:
        fmt.Println("Received message:", msg)
    default:
        fmt.Println("No message received")
    }
    
    // Non-blocking send
    msg := "hi"
    select {
    case messages <- msg:
        fmt.Println("Sent message:", msg)
    default:
        fmt.Println("No message sent")
    }
    
    // Multi-way non-blocking select
    select {
    case msg := <-messages:
        fmt.Println("Received message:", msg)
    case sig := <-signals:
        fmt.Println("Received signal:", sig)
    default:
        fmt.Println("No activity")
    }
}

Hands-On: Timeout Pattern

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    
    go func() {
        time.Sleep(2 * time.Second)
        ch <- "result"
    }()
    
    select {
    case res := <-ch:
        fmt.Println("Received:", res)
    case <-time.After(1 * time.Second):
        fmt.Println("Timeout! Operation took too long")
    }
}

Hands-On: Ticker & Timer

package main

import (
    "fmt"
    "time"
)

func main() {
    // Timer - fire once
    timer := time.NewTimer(2 * time.Second)
    fmt.Println("Timer started")
    <-timer.C
    fmt.Println("Timer fired")
    
    // Ticker - fire repeatedly
    ticker := time.NewTicker(500 * time.Millisecond)
    done := make(chan bool)
    
    go func() {
        for {
            select {
            case <-done:
                return
            case t := <-ticker.C:
                fmt.Println("Tick at", t)
            }
        }
    }()
    
    time.Sleep(2 * time.Second)
    ticker.Stop()
    done <- true
    fmt.Println("Ticker stopped")
}

6. Buffered Channels

Perbedaan Unbuffered vs Buffered

Unbuffered: Send blocks sampai ada yang receive

ch := make(chan int)  // capacity 0

Buffered: Send blocks hanya jika buffer penuh

ch := make(chan int, 3)  // capacity 3

Hands-On: Buffered Channels

package main

import "fmt"

func main() {
    // Buffered channel dengan capacity 2
    messages := make(chan string, 2)
    
    // Bisa send 2 values tanpa blocking
    messages <- "buffered"
    messages <- "channel"
    
    // Receive values
    fmt.Println(<-messages)
    fmt.Println(<-messages)
    
    // Demo blocking behavior
    ch := make(chan int, 2)
    
    ch <- 1
    ch <- 2
    // ch <- 3  // Ini akan deadlock karena buffer penuh
    
    fmt.Println(<-ch)
    ch <- 3  // Sekarang bisa send karena ada space
    
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

Hands-On: Channel Capacity & Length

package main

import "fmt"

func main() {
    ch := make(chan int, 5)
    
    fmt.Printf("Initial - Cap: %d, Len: %d\n", cap(ch), len(ch))
    
    ch <- 1
    ch <- 2
    ch <- 3
    
    fmt.Printf("After sends - Cap: %d, Len: %d\n", cap(ch), len(ch))
    
    <-ch
    
    fmt.Printf("After receive - Cap: %d, Len: %d\n", cap(ch), len(ch))
}

Use Case: Semaphore Pattern

package main

import (
    "fmt"
    "time"
)

func worker(id int, sem chan struct{}) {
    sem <- struct{}{}  // Acquire
    
    fmt.Printf("Worker %d: Starting\n", id)
    time.Sleep(1 * time.Second)
    fmt.Printf("Worker %d: Done\n", id)
    
    <-sem  // Release
}

func main() {
    // Limit to 3 concurrent workers
    maxWorkers := 3
    sem := make(chan struct{}, maxWorkers)
    
    for i := 1; i <= 10; i++ {
        go worker(i, sem)
    }
    
    time.Sleep(5 * time.Second)
}

7. WaitGroups

Apa itu WaitGroup?

WaitGroup digunakan untuk menunggu collection of goroutines selesai.

Hands-On: Basic WaitGroup

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()  // Decrement counter saat selesai
    
    fmt.Printf("Worker %d: Starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d: Done\n", id)
}

func main() {
    var wg sync.WaitGroup
    
    for i := 1; i <= 5; i++ {
        wg.Add(1)  // Increment counter
        go worker(i, &wg)
    }
    
    wg.Wait()  // Block sampai counter = 0
    fmt.Println("All workers completed")
}

Hands-On: WaitGroup dengan Error Handling

package main

import (
    "fmt"
    "sync"
    "time"
)

type Result struct {
    ID    int
    Value int
    Error error
}

func worker(id int, wg *sync.WaitGroup, results chan<- Result) {
    defer wg.Done()
    
    time.Sleep(time.Millisecond * 100)
    
    // Simulate processing
    result := Result{
        ID:    id,
        Value: id * 2,
        Error: nil,
    }
    
    results <- result
}

func main() {
    var wg sync.WaitGroup
    numWorkers := 5
    results := make(chan Result, numWorkers)
    
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, &wg, results)
    }
    
    // Close results channel setelah semua worker selesai
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // Collect results
    for result := range results {
        fmt.Printf("Worker %d returned: %d\n", result.ID, result.Value)
    }
    
    fmt.Println("All workers completed")
}

8. Mutex & RWMutex

Apa itu Mutex?

Mutex (Mutual Exclusion) melindungi shared data dari race conditions.

Race Condition Demo

package main

import (
    "fmt"
    "sync"
)

func main() {
    // ❌ Race condition
    counter := 0
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++  // NOT thread-safe!
        }()
    }
    
    wg.Wait()
    fmt.Println("Counter (unsafe):", counter)  // Tidak selalu 1000!
}

Hands-On: Mutex Solution

package main

import (
    "fmt"
    "sync"
)

type SafeCounter struct {
    mu    sync.Mutex
    value int
}

func (c *SafeCounter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *SafeCounter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func main() {
    counter := SafeCounter{}
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }
    
    wg.Wait()
    fmt.Println("Counter (safe):", counter.Value())  // Selalu 1000!
}

Hands-On: RWMutex (Read-Write Mutex)

package main

import (
    "fmt"
    "sync"
    "time"
)

type Cache struct {
    mu    sync.RWMutex
    data  map[string]string
}

func (c *Cache) Set(key, value string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.data[key] = value
    fmt.Printf("Set: %s = %s\n", key, value)
}

func (c *Cache) Get(key string) (string, bool) {
    c.mu.RLock()  // Read lock - multiple readers OK
    defer c.mu.RUnlock()
    val, ok := c.data[key]
    fmt.Printf("Get: %s = %s\n", key, val)
    return val, ok
}

func main() {
    cache := Cache{
        data: make(map[string]string),
    }
    
    var wg sync.WaitGroup
    
    // Writer
    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 0; i < 3; i++ {
            cache.Set(fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i))
            time.Sleep(100 * time.Millisecond)
        }
    }()
    
    // Multiple readers
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            time.Sleep(50 * time.Millisecond)
            cache.Get("key0")
        }(i)
    }
    
    wg.Wait()
}

Best Practices

// ✅ BENAR - defer unlock
func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

// ❌ SALAH - lupa unlock
func (c *Counter) Increment() {
    c.mu.Lock()
    c.value++
    // Deadlock jika return sebelum unlock!
}

// ✅ BENAR - minimize critical section
func (c *Cache) Process(key string) {
    data := c.expensiveComputation()
    
    c.mu.Lock()
    c.data[key] = data  // Lock hanya saat update
    c.mu.Unlock()
}

9. Atomic Operations

Apa itu Atomic?

Atomic operations adalah operasi yang tidak bisa di-interrupt. Lebih cepat dari Mutex untuk operasi sederhana.

Hands-On: Atomic Counter

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    var counter int64
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            atomic.AddInt64(&counter, 1)
        }()
    }
    
    wg.Wait()
    fmt.Println("Counter:", atomic.LoadInt64(&counter))
}

Hands-On: Atomic Operations Lengkap

package main

import (
    "fmt"
    "sync/atomic"
)

func main() {
    var val int64
    
    // Add
    atomic.AddInt64(&val, 10)
    fmt.Println("After Add:", atomic.LoadInt64(&val))
    
    // Store
    atomic.StoreInt64(&val, 100)
    fmt.Println("After Store:", atomic.LoadInt64(&val))
    
    // Swap (exchange)
    old := atomic.SwapInt64(&val, 200)
    fmt.Println("Old value:", old)
    fmt.Println("New value:", atomic.LoadInt64(&val))
    
    // Compare and Swap (CAS)
    swapped := atomic.CompareAndSwapInt64(&val, 200, 300)
    fmt.Println("Swapped:", swapped)
    fmt.Println("Value:", atomic.LoadInt64(&val))
    
    swapped = atomic.CompareAndSwapInt64(&val, 999, 400)
    fmt.Println("Swapped:", swapped)
    fmt.Println("Value:", atomic.LoadInt64(&val))
}

Hands-On: Atomic Value (Any Type)

package main

import (
    "fmt"
    "sync/atomic"
    "time"
)

type Config struct {
    Timeout time.Duration
    MaxConn int
}

func main() {
    var config atomic.Value
    
    // Store initial config
    config.Store(Config{
        Timeout: time.Second,
        MaxConn: 100,
    })
    
    // Read config
    go func() {
        for i := 0; i < 5; i++ {
            cfg := config.Load().(Config)
            fmt.Printf("Config: %+v\n", cfg)
            time.Sleep(200 * time.Millisecond)
        }
    }()
    
    // Update config
    time.Sleep(500 * time.Millisecond)
    config.Store(Config{
        Timeout: 2 * time.Second,
        MaxConn: 200,
    })
    
    time.Sleep(1 * time.Second)
}

Performance Comparison

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

func benchmarkMutex() time.Duration {
    var mu sync.Mutex
    var counter int64
    var wg sync.WaitGroup
    
    start := time.Now()
    
    for i := 0; i < 100000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()
            counter++
            mu.Unlock()
        }()
    }
    
    wg.Wait()
    return time.Since(start)
}

func benchmarkAtomic() time.Duration {
    var counter int64
    var wg sync.WaitGroup
    
    start := time.Now()
    
    for i := 0; i < 100000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            atomic.AddInt64(&counter, 1)
        }()
    }
    
    wg.Wait()
    return time.Since(start)
}

func main() {
    mutexTime := benchmarkMutex()
    atomicTime := benchmarkAtomic()
    
    fmt.Printf("Mutex:  %v\n", mutexTime)
    fmt.Printf("Atomic: %v\n", atomicTime)
    fmt.Printf("Atomic is %.2fx faster\n", float64(mutexTime)/float64(atomicTime))
}

10. Worker Pool Pattern

Konsep Worker Pool

Worker pool adalah pattern dimana sejumlah goroutines (workers) memproses jobs dari shared queue.

Jobs Queue → [Worker 1]
           → [Worker 2] → Results
           → [Worker 3]

Hands-On: Basic Worker Pool

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID    int
    Value int
}

type Result struct {
    Job   Job
    Sum   int
}

func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for job := range jobs {
        fmt.Printf("Worker %d: Processing job %d\n", id, job.ID)
        time.Sleep(time.Millisecond * 500)  // Simulate work
        
        sum := 0
        for i := 1; i <= job.Value; i++ {
            sum += i
        }
        
        results <- Result{
            Job: job,
            Sum: sum,
        }
    }
}

func main() {
    numWorkers := 3
    numJobs := 10
    
    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)
    
    var wg sync.WaitGroup
    
    // Start workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }
    
    // Send jobs
    for i := 1; i <= numJobs; i++ {
        jobs <- Job{ID: i, Value: i * 10}
    }
    close(jobs)
    
    // Close results channel after all workers done
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // Collect results
    for result := range results {
        fmt.Printf("Job %d: Sum = %d\n", result.Job.ID, result.Sum)
    }
}

Hands-On: Advanced Worker Pool dengan Error Handling

package main

import (
	"fmt"
	"sync"
	"time"
)

type Task struct {
	ID   int
	Data string
}

type TaskResult struct {
	Task   Task
	Result string
	Error  error
}

type WorkerPool struct {
	workers int
	tasks   chan Task       // task queue
	results chan TaskResult // result queue
	wg      sync.WaitGroup
}

func NewWorkerPool(workers int) *WorkerPool {
	queueSize := 100
	return &WorkerPool{
		workers: workers,
		tasks:   make(chan Task, queueSize),
		results: make(chan TaskResult, queueSize),
	}
}

func (wp *WorkerPool) worker(id int) {
	defer wp.wg.Done()
	fmt.Printf("Worker %d: Started\n", id)

	// Menggunakan for range adalah cara paling umum dan aman
	// untuk memproses item dari sebuah channel sampai channel tersebut ditutup.
	for task := range wp.tasks {
		// Proses task
		result, err := wp.processTask(task, id)

		wp.results <- TaskResult{
			Task:   task,
			Result: result,
			Error:  err,
		}
	}

	// Worker akan otomatis keluar dari loop dan sampai ke baris ini
	// ketika channel wp.tasks ditutup dan semua task di dalamnya sudah diproses.
	fmt.Printf("Worker %d: Shutting down (tasks channel closed)\n", id)
}

func (wp *WorkerPool) processTask(task Task, workerID int) (string, error) {
	// Simulasi kerja
	time.Sleep(time.Millisecond * 100)
	return fmt.Sprintf("Processed: %s by worker %d", task.Data, workerID), nil
}

func (wp *WorkerPool) Start() {
	for i := 1; i <= wp.workers; i++ {
		wp.wg.Add(1)
		go wp.worker(i)
	}
}

func (wp *WorkerPool) Submit(task Task) {
	wp.tasks <- task
}

func (wp *WorkerPool) Shutdown() {
	// Cukup tutup channel tasks. Ini akan menjadi sinyal bagi semua worker
	// untuk menyelesaikan task yang tersisa dan kemudian berhenti.
	close(wp.tasks)

	// Tunggu semua worker selesai (setelah mereka memanggil wg.Done())
	wp.wg.Wait()

	// Setelah semua worker berhenti (dan tidak akan mengirim result lagi),
	// kita bisa aman menutup channel results.
	close(wp.results)
}

func (wp *WorkerPool) Results() <-chan TaskResult {
	return wp.results
}

func main() {
	pool := NewWorkerPool(5)
	pool.Start()

	// Submit tasks
	go func() {
		// PENTING: Panggil Shutdown di dalam goroutine yang sama
		// setelah semua task selesai di-submit.
		defer pool.Shutdown()

		for i := 1; i <= 20; i++ {
			pool.Submit(Task{
				ID:   i,
				Data: fmt.Sprintf("Task-%d", i),
			})
		}
	}()

	// Collect results
	successCount := 0
	errorCount := 0

	// Loop ini akan berhenti secara otomatis ketika channel results ditutup oleh Shutdown()
	for result := range pool.Results() {
		if result.Error != nil {
			fmt.Printf("Task %d failed: %v\n", result.Task.ID, result.Error)
			errorCount++
		} else {
			fmt.Printf("Task %d: %s\n", result.Task.ID, result.Result)
			successCount++
		}
	}

	fmt.Printf("\nSummary: %d succeeded, %d failed\n", successCount, errorCount)
}

11. Pipeline Pattern

Konsep Pipeline

Pipeline adalah series of stages yang terhubung oleh channels, dimana setiap stage adalah group of goroutines yang menjalankan fungsi yang sama.

Input → [Stage 1] → [Stage 2] → [Stage 3] → Output

Hands-On: Simple Pipeline

package main

import "fmt"

// Stage 1: Generate numbers
func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// Stage 2: Square numbers
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// Stage 3: Filter even numbers
func filterEven(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            if n%2 == 0 {
                out <- n
            }
        }
        close(out)
    }()
    return out
}

func main() {
    // Build pipeline
    numbers := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squared := square(numbers)
    evens := filterEven(squared)
    
    // Consume output
    for n := range evens {
        fmt.Println(n)
    }
}

Hands-On: Data Processing Pipeline

package main

import (
    "fmt"
    "strings"
    "time"
)

type Data struct {
    ID    int
    Value string
}

// Stage 1: Fetch data
func fetchData(ids []int) <-chan Data {
    out := make(chan Data)
    go func() {
        defer close(out)
        for _, id := range ids {
            time.Sleep(50 * time.Millisecond)  // Simulate fetch
            out <- Data{
                ID:    id,
                Value: fmt.Sprintf("raw_data_%d", id),
            }
        }
    }()
    return out
}

// Stage 2: Transform data
func transform(in <-chan Data) <-chan Data {
    out := make(chan Data)
    go func() {
        defer close(out)
        for data := range in {
            time.Sleep(30 * time.Millisecond)  // Simulate processing
            data.Value = strings.ToUpper(data.Value)
            out <- data
        }
    }()
    return out
}

// Stage 3: Validate data
func validate(in <-chan Data) <-chan Data {
    out := make(chan Data)
    go func() {
        defer close(out)
        for data := range in {
            if len(data.Value) > 0 {
                out <- data
            }
        }
    }()
    return out
}

// Stage 4: Save data
func save(in <-chan Data) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for data := range in {
            time.Sleep(20 * time.Millisecond)  // Simulate save
            out <- fmt.Sprintf("Saved: ID=%d, Value=%s", data.ID, data.Value)
        }
    }()
    return out
}

func main() {
    start := time.Now()
    
    // Build pipeline
    ids := []int{1, 2, 3, 4, 5}
    pipeline := save(validate(transform(fetchData(ids))))
    
    // Consume results
    for result := range pipeline {
        fmt.Println(result)
    }
    
    fmt.Printf("\nTotal time: %v\n", time.Since(start))
}

Hands-On: Pipeline dengan Cancellation

package main

import (
    "fmt"
    "time"
)

func generator(done <-chan struct{}, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case out <- n:
            case <-done:
                fmt.Println("Generator: Cancelled")
                return
            }
        }
    }()
    return out
}

func square(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                fmt.Println("Square: Cancelled")
                return
            }
        }
    }()
    return out
}

func main() {
    done := make(chan struct{})
    
    numbers := generator(done, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squared := square(done, numbers)
    
    // Cancel after processing 3 items
    count := 0
    for n := range squared {
        fmt.Println(n)
        count++
        if count == 3 {
            close(done)
            break
        }
    }
    
    time.Sleep(100 * time.Millisecond)
    fmt.Println("Pipeline cancelled")
}

12. Fan-Out Fan-In Pattern

Konsep Fan-Out Fan-In

Fan-Out: Multiple goroutines membaca dari channel yang sama Fan-In: Multiple goroutines menulis ke channel yang sama

        → [Worker 1] →
Input → → [Worker 2] → → Output (merge)
        → [Worker 3] →

Hands-On: Fan-Out Fan-In

package main

import (
    "fmt"
    "sync"
    "time"
)

// Generate numbers
func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// Fan-out: Multiple workers process from same input
func worker(id int, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            fmt.Printf("Worker %d processing %d\n", id, n)
            time.Sleep(time.Millisecond * 100)
            out <- n * n
        }
    }()
    return out
}

// Fan-in: Merge multiple channels into one
func merge(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    
    // Start goroutine for each input channel
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            out <- n
        }
    }
    
    wg.Add(len(channels))
    for _, c := range channels {
        go output(c)
    }
    
    // Close out after all inputs are done
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func main() {
    start := time.Now()
    
    // Generate input
    input := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    // Fan-out to 3 workers
    worker1 := worker(1, input)
    worker2 := worker(2, input)
    worker3 := worker(3, input)
    
    // Fan-in results
    results := merge(worker1, worker2, worker3)
    
    // Collect all results
    for result := range results {
        fmt.Println("Result:", result)
    }
    
    fmt.Printf("\nTotal time: %v\n", time.Since(start))
}

Hands-On: Advanced Fan-Out dengan Load Balancing

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID       int
    Duration time.Duration
}

type Result struct {
    JobID    int
    WorkerID int
    Output   string
}

func jobGenerator(jobs []Job) <-chan Job {
    out := make(chan Job)
    go func() {
        defer close(out)
        for _, job := range jobs {
            out <- job
        }
    }()
    return out
}

func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for job := range jobs {
        fmt.Printf("Worker %d: Started job %d\n", id, job.ID)
        time.Sleep(job.Duration)
        
        results <- Result{
            JobID:    job.ID,
            WorkerID: id,
            Output:   fmt.Sprintf("Job %d completed", job.ID),
        }
        
        fmt.Printf("Worker %d: Finished job %d\n", id, job.ID)
    }
}

func main() {
    // Create jobs with varying durations
    jobs := []Job{
        {ID: 1, Duration: 2 * time.Second},
        {ID: 2, Duration: 1 * time.Second},
        {ID: 3, Duration: 3 * time.Second},
        {ID: 4, Duration: 1 * time.Second},
        {ID: 5, Duration: 2 * time.Second},
        {ID: 6, Duration: 1 * time.Second},
    }
    
    start := time.Now()
    
    numWorkers := 3
    jobChan := jobGenerator(jobs)
    results := make(chan Result, len(jobs))
    
    var wg sync.WaitGroup
    
    // Fan-out: Start workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobChan, results, &wg)
    }
    
    // Close results when all workers done
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // Collect results
    for result := range results {
        fmt.Printf("Result: %s (by Worker %d)\n", result.Output, result.WorkerID)
    }
    
    fmt.Printf("\nTotal time: %v\n", time.Since(start))
    fmt.Println("Without concurrency would take:", 10*time.Second)
}

13. Context Package

Apa itu Context?

Context digunakan untuk:

  • Cancellation signals

  • Deadlines & timeouts

  • Request-scoped values

Hands-On: Context dengan Cancellation

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d: Cancelled (%v)\n", id, ctx.Err())
            return
        default:
            fmt.Printf("Worker %d: Working...\n", id)
            time.Sleep(500 * time.Millisecond)
        }
    }
}

func main() {
    // Create cancellable context
    ctx, cancel := context.WithCancel(context.Background())
    
    // Start workers
    for i := 1; i <= 3; i++ {
        go worker(ctx, i)
    }
    
    // Let them work for 2 seconds
    time.Sleep(2 * time.Second)
    
    // Cancel all workers
    fmt.Println("Main: Cancelling workers...")
    cancel()
    
    time.Sleep(1 * time.Second)
    fmt.Println("Main: Done")
}

Hands-On: Context dengan Timeout

package main

import (
    "context"
    "fmt"
    "time"
)

func slowOperation(ctx context.Context) error {
    done := make(chan struct{})
    
    go func() {
        // Simulate slow operation
        time.Sleep(3 * time.Second)
        close(done)
    }()
    
    select {
    case <-done:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func main() {
    // Context dengan timeout 2 detik
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    fmt.Println("Starting operation...")
    err := slowOperation(ctx)
    
    if err != nil {
        fmt.Println("Operation failed:", err)
    } else {
        fmt.Println("Operation succeeded")
    }
}

Hands-On: Context dengan Deadline

package main

import (
    "context"
    "fmt"
    "time"
)

func processData(ctx context.Context, data string) {
    deadline, ok := ctx.Deadline()
    if ok {
        fmt.Printf("Deadline: %v\n", deadline)
    }
    
    for i := 1; i <= 5; i++ {
        select {
        case <-ctx.Done():
            fmt.Println("Processing cancelled:", ctx.Err())
            return
        default:
            fmt.Printf("Processing step %d\n", i)
            time.Sleep(500 * time.Millisecond)
        }
    }
    
    fmt.Println("Processing complete")
}

func main() {
    // Set deadline 2 detik dari sekarang
    deadline := time.Now().Add(2 * time.Second)
    ctx, cancel := context.WithDeadline(context.Background(), deadline)
    defer cancel()
    
    go processData(ctx, "important data")
    
    time.Sleep(3 * time.Second)
}

Hands-On: Context Values

package main

import (
    "context"
    "fmt"
)

type contextKey string

func processRequest(ctx context.Context) {
    // Extract values from context
    if userID, ok := ctx.Value(contextKey("userID")).(string); ok {
        fmt.Println("User ID:", userID)
    }
    
    if requestID, ok := ctx.Value(contextKey("requestID")).(string); ok {
        fmt.Println("Request ID:", requestID)
    }
    
    // Pass context to other functions
    doWork(ctx)
}

func doWork(ctx context.Context) {
    if userID, ok := ctx.Value(contextKey("userID")).(string); ok {
        fmt.Println("Doing work for user:", userID)
    }
}

func main() {
    // Create context with values
    ctx := context.Background()
    ctx = context.WithValue(ctx, contextKey("userID"), "user123")
    ctx = context.WithValue(ctx, contextKey("requestID"), "req456")
    
    processRequest(ctx)
}

Hands-On: Real-World HTTP Request dengan Context

package main

import (
    "context"
    "fmt"
    "math/rand"
    "time"
)

type Response struct {
    Data string
    Err  error
}

func fetchData(ctx context.Context, url string) Response {
    resultCh := make(chan Response, 1)
    
    go func() {
        // Simulate HTTP request
        time.Sleep(time.Duration(rand.Intn(3000)) * time.Millisecond)
        resultCh <- Response{
            Data: fmt.Sprintf("Data from %s", url),
            Err:  nil,
        }
    }()
    
    select {
    case result := <-resultCh:
        return result
    case <-ctx.Done():
        return Response{
            Data: "",
            Err:  ctx.Err(),
        }
    }
}

func main() {
    urls := []string{
        "https://api1.example.com",
        "https://api2.example.com",
        "https://api3.example.com",
    }
    
    // Set timeout untuk semua requests
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    results := make(chan Response, len(urls))
    
    for _, url := range urls {
        go func(u string) {
            results <- fetchData(ctx, u)
        }(url)
    }
    
    for i := 0; i < len(urls); i++ {
        result := <-results
        if result.Err != nil {
            fmt.Printf("Error: %v\n", result.Err)
        } else {
            fmt.Printf("Success: %s\n", result.Data)
        }
    }
}

14. Rate Limiting

Konsep Rate Limiting

Rate limiting membatasi jumlah operasi per unit waktu.

Hands-On: Simple Rate Limiter dengan Ticker

package main

import (
    "fmt"
    "time"
)

func main() {
    // Allow 1 request per 200ms
    limiter := time.Tick(200 * time.Millisecond)
    
    requests := make(chan int, 5)
    for i := 1; i <= 5; i++ {
        requests <- i
    }
    close(requests)
    
    for req := range requests {
        <-limiter  // Wait for rate limiter
        fmt.Println("Request", req, time.Now())
    }
}

Hands-On: Bursty Rate Limiter

package main

import (
    "fmt"
    "time"
)

func main() {
    // Allow bursts of 3 requests
    burstyLimiter := make(chan time.Time, 3)
    
    // Fill bucket initially
    for i := 0; i < 3; i++ {
        burstyLimiter <- time.Now()
    }
    
    // Refill 1 token every 200ms
    go func() {
        for t := range time.Tick(200 * time.Millisecond) {
            burstyLimiter <- t
        }
    }()
    
    // Simulate 10 requests
    requests := make(chan int, 10)
    for i := 1; i <= 10; i++ {
        requests <- i
    }
    close(requests)
    
    for req := range requests {
        <-burstyLimiter
        fmt.Println("Request", req, time.Now())
    }
}

Hands-On: Token Bucket Rate Limiter

package main

import (
    "fmt"
    "sync"
    "time"
)

type RateLimiter struct {
    tokens    int
    maxTokens int
    refillRate time.Duration
    mu        sync.Mutex
}

func NewRateLimiter(maxTokens int, refillRate time.Duration) *RateLimiter {
    rl := &RateLimiter{
        tokens:     maxTokens,
        maxTokens:  maxTokens,
        refillRate: refillRate,
    }
    
    // Start refill goroutine
    go rl.refill()
    
    return rl
}

func (rl *RateLimiter) refill() {
    ticker := time.NewTicker(rl.refillRate)
    defer ticker.Stop()
    
    for range ticker.C {
        rl.mu.Lock()
        if rl.tokens < rl.maxTokens {
            rl.tokens++
        }
        rl.mu.Unlock()
    }
}

func (rl *RateLimiter) Allow() bool {
    rl.mu.Lock()
    defer rl.mu.Unlock()
    
    if rl.tokens > 0 {
        rl.tokens--
        return true
    }
    return false
}

func (rl *RateLimiter) Wait() {
    for !rl.Allow() {
        time.Sleep(10 * time.Millisecond)
    }
}

func main() {
    limiter := NewRateLimiter(5, 500*time.Millisecond)
    
    for i := 1; i <= 10; i++ {
        limiter.Wait()
        fmt.Printf("Request %d at %v\n", i, time.Now())
    }
}

Hands-On: Rate Limiter dengan golang.org/x/time/rate

package main

import (
    "context"
    "fmt"
    "time"
    
    // Uncomment jika package tersedia:
    // "golang.org/x/time/rate"
)

// Simulasi implementasi sederhana
type Limiter struct {
    rate  int
    burst int
    tokens chan struct{}
}

func NewLimiter(rate, burst int) *Limiter {
    l := &Limiter{
        rate:   rate,
        burst:  burst,
        tokens: make(chan struct{}, burst),
    }
    
    // Fill initial tokens
    for i := 0; i < burst; i++ {
        l.tokens <- struct{}{}
    }
    
    // Refill tokens
    go func() {
        ticker := time.NewTicker(time.Second / time.Duration(rate))
        defer ticker.Stop()
        
        for range ticker.C {
            select {
            case l.tokens <- struct{}{}:
            default:
            }
        }
    }()
    
    return l
}

func (l *Limiter) Wait(ctx context.Context) error {
    select {
    case <-l.tokens:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func main() {
    // Allow 2 requests per second, burst of 5
    limiter := NewLimiter(2, 5)
    
    for i := 1; i <= 10; i++ {
        ctx := context.Background()
        if err := limiter.Wait(ctx); err != nil {
            fmt.Printf("Request %d: Rate limited\n", i)
            continue
        }
        fmt.Printf("Request %d: Allowed at %v\n", i, time.Now())
    }
}

15. Error Handling dalam Concurrency

Challenge dalam Error Handling

Goroutines tidak bisa return error secara langsung ke caller.

Hands-On: Error Handling dengan Channels

package main

import (
    "errors"
    "fmt"
    "math/rand"
)

type Result struct {
    Value int
    Error error
}

func worker(id int, results chan<- Result) {
    // Simulate work that might fail
    if rand.Intn(10) < 3 {
        results <- Result{
            Value: 0,
            Error: errors.New(fmt.Sprintf("worker %d failed", id)),
        }
        return
    }
    
    results <- Result{
        Value: id * 2,
        Error: nil,
    }
}

func main() {
    numWorkers := 10
    results := make(chan Result, numWorkers)
    
    for i := 1; i <= numWorkers; i++ {
        go worker(i, results)
    }
    
    successCount := 0
    errorCount := 0
    
    for i := 0; i < numWorkers; i++ {
        result := <-results
        if result.Error != nil {
            fmt.Printf("Error: %v\n", result.Error)
            errorCount++
        } else {
            fmt.Printf("Success: %d\n", result.Value)
            successCount++
        }
    }
    
    fmt.Printf("\nTotal: %d success, %d errors\n", successCount, errorCount)
}

Hands-On: errgroup Package Pattern

package main

import (
    "context"
    "errors"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Simulasi errgroup
type Group struct {
    wg     sync.WaitGroup
    errMu  sync.Mutex
    err    error
    cancel context.CancelFunc
}

func WithContext(ctx context.Context) (*Group, context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    return &Group{cancel: cancel}, ctx
}

func (g *Group) Go(f func() error) {
    g.wg.Add(1)
    go func() {
        defer g.wg.Done()
        
        if err := f(); err != nil {
            g.errMu.Lock()
            if g.err == nil {
                g.err = err
                if g.cancel != nil {
                    g.cancel()
                }
            }
            g.errMu.Unlock()
        }
    }()
}

func (g *Group) Wait() error {
    g.wg.Wait()
    if g.cancel != nil {
        g.cancel()
    }
    return g.err
}

func fetchData(ctx context.Context, id int) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case <-time.After(time.Duration(rand.Intn(500)) * time.Millisecond):
        if rand.Intn(10) < 3 {
            return errors.New(fmt.Sprintf("fetch %d failed", id))
        }
        fmt.Printf("Fetch %d succeeded\n", id)
        return nil
    }
}

func main() {
    g, ctx := WithContext(context.Background())
    
    for i := 1; i <= 10; i++ {
        id := i
        g.Go(func() error {
            return fetchData(ctx, id)
        })
    }
    
    if err := g.Wait(); err != nil {
        fmt.Println("\nFirst error:", err)
    } else {
        fmt.Println("\nAll operations succeeded")
    }
}

Hands-On: Panic Recovery dalam Goroutines

package main

import (
    "fmt"
    "time"
)

func riskyWorker(id int) {
    defer func() {
        if r := recover(); r != nil {
            fmt.Printf("Worker %d recovered from panic: %v\n", id, r)
        }
    }()
    
    fmt.Printf("Worker %d starting\n", id)
    
    if id%2 == 0 {
        panic(fmt.Sprintf("Worker %d panicked!", id))
    }
    
    time.Sleep(100 * time.Millisecond)
    fmt.Printf("Worker %d completed\n", id)
}

func main() {
    for i := 1; i <= 5; i++ {
        go riskyWorker(i)
    }
    
    time.Sleep(1 * time.Second)
    fmt.Println("Main function continues despite panics")
}

Hands-On: Timeout Pattern dengan Error

package main

import (
    "context"
    "errors"
    "fmt"
    "time"
)

func slowOperation(ctx context.Context) (string, error) {
    resultCh := make(chan string)
    errCh := make(chan error)
    
    go func() {
        time.Sleep(2 * time.Second)
        resultCh <- "operation completed"
    }()
    
    select {
    case result := <-resultCh:
        return result, nil
    case err := <-errCh:
        return "", err
    case <-ctx.Done():
        return "", errors.New("operation timeout")
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()
    
    result, err := slowOperation(ctx)
    if err != nil {
        fmt.Println("Error:", err)
    } else {
        fmt.Println("Result:", result)
    }
}

16. Advanced Patterns

16.1 Semaphore Pattern

package main

import (
    "fmt"
    "sync"
    "time"
)

type Semaphore struct {
    sem chan struct{}
}

func NewSemaphore(max int) *Semaphore {
    return &Semaphore{
        sem: make(chan struct{}, max),
    }
}

func (s *Semaphore) Acquire() {
    s.sem <- struct{}{}
}

func (s *Semaphore) Release() {
    <-s.sem
}

func worker(id int, sem *Semaphore, wg *sync.WaitGroup) {
    defer wg.Done()
    
    sem.Acquire()
    defer sem.Release()
    
    fmt.Printf("Worker %d: Started\n", id)
    time.Sleep(1 * time.Second)
    fmt.Printf("Worker %d: Finished\n", id)
}

func main() {
    maxConcurrent := 3
    sem := NewSemaphore(maxConcurrent)
    var wg sync.WaitGroup
    
    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go worker(i, sem, &wg)
    }
    
    wg.Wait()
    fmt.Println("All workers completed")
}

16.2 Future/Promise Pattern

package main

import (
    "errors"
    "fmt"
    "time"
)

type Future struct {
    result chan interface{}
    err    chan error
}

func NewFuture(f func() (interface{}, error)) *Future {
    future := &Future{
        result: make(chan interface{}, 1),
        err:    make(chan error, 1),
    }
    
    go func() {
        result, err := f()
        if err != nil {
            future.err <- err
        } else {
            future.result <- result
        }
    }()
    
    return future
}

func (f *Future) Get() (interface{}, error) {
    select {
    case result := <-f.result:
        return result, nil
    case err := <-f.err:
        return nil, err
    }
}

func (f *Future) GetWithTimeout(timeout time.Duration) (interface{}, error) {
    select {
    case result := <-f.result:
        return result, nil
    case err := <-f.err:
        return nil, err
    case <-time.After(timeout):
        return nil, errors.New("timeout")
    }
}

func expensiveOperation() (interface{}, error) {
    time.Sleep(2 * time.Second)
    return "Computed result", nil
}

func main() {
    fmt.Println("Starting future...")
    future := NewFuture(expensiveOperation)
    
    fmt.Println("Doing other work...")
    time.Sleep(1 * time.Second)
    
    fmt.Println("Getting future result...")
    result, err := future.GetWithTimeout(3 * time.Second)
    if err != nil {
        fmt.Println("Error:", err)
    } else {
        fmt.Println("Result:", result)
    }
}

16.3 Bounded Parallelism

package main

import (
    "fmt"
    "sync"
    "time"
)

func boundedParallel(tasks []func() error, maxConcurrent int) []error {
    var wg sync.WaitGroup
    sem := make(chan struct{}, maxConcurrent)
    errors := make([]error, len(tasks))
    
    for i, task := range tasks {
        wg.Add(1)
        go func(idx int, t func() error) {
            defer wg.Done()
            
            sem <- struct{}{}        // Acquire
            defer func() { <-sem }() // Release
            
            errors[idx] = t()
        }(i, task)
    }
    
    wg.Wait()
    return errors
}

func main() {
    tasks := []func() error{
        func() error {
            fmt.Println("Task 1")
            time.Sleep(1 * time.Second)
            return nil
        },
        func() error {
            fmt.Println("Task 2")
            time.Sleep(1 * time.Second)
            return nil
        },
        func() error {
            fmt.Println("Task 3")
            time.Sleep(1 * time.Second)
            return nil
        },
        func() error {
            fmt.Println("Task 4")
            time.Sleep(1 * time.Second)
            return nil
        },
        func() error {
            fmt.Println("Task 5")
            time.Sleep(1 * time.Second)
            return nil
        },
    }
    
    start := time.Now()
    errors := boundedParallel(tasks, 2) // Max 2 concurrent
    duration := time.Since(start)
    
    fmt.Printf("\nCompleted in %v\n", duration)
    fmt.Printf("Errors: %v\n", errors)
}

16.4 Or-Channel Pattern

package main

import (
    "fmt"
    "time"
)

func or(channels ...<-chan interface{}) <-chan interface{} {
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }
    
    orDone := make(chan interface{})
    
    go func() {
        defer close(orDone)
        
        switch len(channels) {
        case 2:
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default:
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            case <-or(append(channels[3:], orDone)...):
            }
        }
    }()
    
    return orDone
}

func sig(after time.Duration) <-chan interface{} {
    c := make(chan interface{})
    go func() {
        defer close(c)
        time.Sleep(after)
    }()
    return c
}

func main() {
    start := time.Now()
    
    <-or(
        sig(2*time.Hour),
        sig(5*time.Minute),
        sig(1*time.Second),
        sig(1*time.Hour),
        sig(1*time.Minute),
    )
    
    fmt.Printf("Done after %v\n", time.Since(start))
}

16.5 Tee-Channel Pattern

package main

import "fmt"

func tee(in <-chan int) (<-chan int, <-chan int) {
    out1 := make(chan int)
    out2 := make(chan int)
    
    go func() {
        defer close(out1)
        defer close(out2)
        
        for val := range in {
            out1, out2 := out1, out2 // Shadow variables
            for i := 0; i < 2; i++ {
                select {
                case out1 <- val:
                    out1 = nil
                case out2 <- val:
                    out2 = nil
                }
            }
        }
    }()
    
    return out1, out2
}

func main() {
    input := make(chan int)
    
    go func() {
        defer close(input)
        for i := 1; i <= 5; i++ {
            input <- i
        }
    }()
    
    out1, out2 := tee(input)
    
    for val := range out1 {
        fmt.Printf("Out1: %d, Out2: %d\n", val, <-out2)
    }
}

16.6 Bridge-Channel Pattern

package main

import (
    "fmt"
)

func bridge(chanStream <-chan <-chan int) <-chan int {
    valStream := make(chan int)
    
    go func() {
        defer close(valStream)
        
        for ch := range chanStream {
            for val := range ch {
                valStream <- val
            }
        }
    }()
    
    return valStream
}

func main() {
    chanStream := make(chan (<-chan int))
    
    go func() {
        defer close(chanStream)
        
        for i := 0; i < 3; i++ {
            ch := make(chan int)
            chanStream <- ch
            
            go func(ch chan int) {
                defer close(ch)
                for j := 1; j <= 3; j++ {
                    ch <- j
                }
            }(ch)
        }
    }()
    
    for val := range bridge(chanStream) {
        fmt.Printf("%d ", val)
    }
    fmt.Println()
}

16.7 Or-Done Channel Pattern

package main

import (
    "fmt"
    "time"
)

func orDone(done <-chan struct{}, c <-chan int) <-chan int {
    valStream := make(chan int)
    
    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if !ok {
                    return
                }
                select {
                case valStream <- v:
                case <-done:
                    return
                }
            }
        }
    }()
    
    return valStream
}

func main() {
    done := make(chan struct{})
    defer close(done)
    
    input := make(chan int)
    
    go func() {
        defer close(input)
        for i := 1; i <= 10; i++ {
            input <- i
            time.Sleep(100 * time.Millisecond)
        }
    }()
    
    // Cancel after 500ms
    go func() {
        time.Sleep(500 * time.Millisecond)
        close(done)
    }()
    
    for val := range orDone(done, input) {
        fmt.Println(val)
    }
}

16.8 Replicated Requests Pattern

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func doWork(id int) <-chan string {
    result := make(chan string, 1)
    
    go func() {
        // Simulate varying response times
        delay := time.Duration(rand.Intn(500)) * time.Millisecond
        time.Sleep(delay)
        result <- fmt.Sprintf("Result from worker %d (took %v)", id, delay)
    }()
    
    return result
}

func firstResponse(workers int) string {
    channels := make([]<-chan string, workers)
    
    for i := 0; i < workers; i++ {
        channels[i] = doWork(i + 1)
    }
    
    // Return first response
    select {
    case result := <-channels[0]:
        return result
    case result := <-channels[1]:
        return result
    case result := <-channels[2]:
        return result
    }
}

func main() {
    for i := 0; i < 5; i++ {
        start := time.Now()
        result := firstResponse(3)
        fmt.Printf("Attempt %d: %s (total: %v)\n", i+1, result, time.Since(start))
        time.Sleep(100 * time.Millisecond)
    }
}

16.9 Queuing Pattern

package main

import (
    "fmt"
    "sync"
    "time"
)

type Queue struct {
    items []interface{}
    mu    sync.Mutex
    cond  *sync.Cond
}

func NewQueue() *Queue {
    q := &Queue{
        items: make([]interface{}, 0),
    }
    q.cond = sync.NewCond(&q.mu)
    return q
}

func (q *Queue) Enqueue(item interface{}) {
    q.mu.Lock()
    defer q.mu.Unlock()
    
    q.items = append(q.items, item)
    q.cond.Signal() // Wake up one waiting goroutine
}

func (q *Queue) Dequeue() interface{} {
    q.mu.Lock()
    defer q.mu.Unlock()
    
    for len(q.items) == 0 {
        q.cond.Wait() // Wait until items available
    }
    
    item := q.items[0]
    q.items = q.items[1:]
    return item
}

func (q *Queue) Len() int {
    q.mu.Lock()
    defer q.mu.Unlock()
    return len(q.items)
}

func main() {
    queue := NewQueue()
    
    // Producer
    go func() {
        for i := 1; i <= 10; i++ {
            queue.Enqueue(i)
            fmt.Printf("Enqueued: %d\n", i)
            time.Sleep(100 * time.Millisecond)
        }
    }()
    
    // Consumer
    for i := 0; i < 10; i++ {
        item := queue.Dequeue()
        fmt.Printf("Dequeued: %d\n", item)
        time.Sleep(200 * time.Millisecond)
    }
}

16.10 Heartbeat Pattern

package main

import (
    "fmt"
    "time"
)

func doWork(done <-chan struct{}) (<-chan int, <-chan struct{}) {
    heartbeat := make(chan struct{})
    results := make(chan int)
    
    go func() {
        defer close(heartbeat)
        defer close(results)
        
        ticker := time.NewTicker(500 * time.Millisecond)
        defer ticker.Stop()
        
        for i := 0; i < 10; i++ {
            select {
            case <-done:
                return
            case <-ticker.C:
                heartbeat <- struct{}{}
            case results <- i:
                fmt.Printf("Sent result: %d\n", i)
            }
            
            time.Sleep(300 * time.Millisecond)
        }
    }()
    
    return results, heartbeat
}

func main() {
    done := make(chan struct{})
    defer close(done)
    
    results, heartbeat := doWork(done)
    
    timeout := time.After(10 * time.Second)
    
    for {
        select {
        case result, ok := <-results:
            if !ok {
                fmt.Println("Worker completed")
                return
            }
            fmt.Printf("Received result: %d\n", result)
            
        case <-heartbeat:
            fmt.Println("💓 Heartbeat received")
            
        case <-timeout:
            fmt.Println("Worker timed out")
            return
        }
    }
}

Best Practices & Common Pitfalls

✅ Best Practices

  1. Always close channels from sender side

// ✅ BENAR
go func() {
    defer close(ch)
    for _, val := range data {
        ch <- val
    }
}()
  1. Use defer untuk unlock mutex

// ✅ BENAR
func (c *Counter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}
  1. Pass channels sebagai parameter, bukan variable

// ✅ BENAR
func worker(jobs <-chan Job, results chan<- Result)

// ❌ SALAH
var globalJobs chan Job
  1. Gunakan context untuk cancellation

// ✅ BENAR
func process(ctx context.Context) {
    select {
    case <-ctx.Done():
        return
    case <-work:
        // process
    }
}
  1. Buffer channels ketika producer/consumer independent

// ✅ BENAR untuk fire-and-forget
results := make(chan Result, 100)

❌ Common Pitfalls

  1. Goroutine Leak

// ❌ SALAH - goroutine never exits
go func() {
    for {
        // No exit condition!
    }
}()

// ✅ BENAR
go func() {
    for {
        select {
        case <-done:
            return
        case work := <-jobs:
            // process
        }
    }
}()
  1. Closing channel multiple times

// ❌ SALAH - panic!
close(ch)
close(ch)

// ✅ BENAR - close once
sync.Once{}.Do(func() {
    close(ch)
})
  1. Sending to closed channel

// ❌ SALAH - panic!
close(ch)
ch <- value

// ✅ BENAR - check if closed
select {
case ch <- value:
default:
    // channel closed or full
}
  1. Data race

// ❌ SALAH
counter := 0
for i := 0; i < 10; i++ {
    go func() {
        counter++ // Race!
    }()
}

// ✅ BENAR - use mutex or atomic
var mu sync.Mutex
for i := 0; i < 10; i++ {
    go func() {
        mu.Lock()
        counter++
        mu.Unlock()
    }()
}
  1. Forgetting WaitGroup.Done()

// ❌ SALAH - deadlock!
var wg sync.WaitGroup
wg.Add(1)
go func() {
    // Forgot wg.Done()
}()
wg.Wait()

// ✅ BENAR
var wg sync.WaitGroup
wg.Add(1)
go func() {
    defer wg.Done()
    // work
}()
wg.Wait()

Performance Tips

1. Profile Concurrent Code

# CPU profiling
go test -cpuprofile=cpu.prof -bench=.
go tool pprof cpu.prof

# Race detection
go test -race
go run -race main.go

2. Right-size Worker Pools

// Rule of thumb:
// CPU-bound: numWorkers = runtime.NumCPU()
// I/O-bound: numWorkers = much higher (100-1000s)

numWorkers := runtime.NumCPU()

3. Use Buffered Channels Wisely

// Unbuffered: synchronization points
ch := make(chan int)

// Buffered: reduce blocking
ch := make(chan int, 100)

4. Prefer Atomic for Simple Counters

// Faster than mutex for simple operations
atomic.AddInt64(&counter, 1)

5. Context Overhead

// Context has overhead - don't overuse
// Use for:
// - Cancellation
// - Timeouts
// - Request-scoped values

// Don't use for:
// - Passing regular function parameters

Testing Concurrent Code

Test dengan Race Detector

package main

import (
    "sync"
    "testing"
)

func TestCounter(t *testing.T) {
    var counter int64
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++ // Will be caught by -race
        }()
    }
    
    wg.Wait()
}

// Run: go test -race

Test dengan Timeout

func TestWithTimeout(t *testing.T) {
    done := make(chan bool)
    
    go func() {
        // Your concurrent code
        time.Sleep(100 * time.Millisecond)
        done <- true
    }()
    
    select {
    case <-done:
        // Success
    case <-time.After(1 * time.Second):
        t.Fatal("Test timeout")
    }
}

Resources & Further Learning

Official Documentation

  • Go Concurrency Patterns: https://go.dev/blog/pipelines

  • Effective Go: https://go.dev/doc/effective_go#concurrency

  • Go Memory Model: https://go.dev/ref/mem

Books

  • "Concurrency in Go" by Katherine Cox-Buday

  • "The Go Programming Language" by Donovan & Kernighan

Tools

  • go test -race: Race detector

  • go tool trace: Execution tracer

  • pprof: Performance profiler


Summary

Key Takeaways

  1. Goroutines: Lightweight, efficient concurrency primitives

  2. Channels: Safe communication between goroutines

  3. Select: Multiplexing channel operations

  4. Sync Package: Low-level synchronization (Mutex, WaitGroup, etc.)

  5. Context: Cancellation, timeouts, and request-scoped values

  6. Patterns: Worker pools, pipelines, fan-out/fan-in

When to Use What

Use Case
Tool

Share memory

Mutex, RWMutex

Simple counter

Atomic operations

Communicate data

Channels

Wait for goroutines

WaitGroup

Limit concurrency

Semaphore, Buffered channel

Cancellation

Context

Timeout

Context.WithTimeout, time.After

Remember

"Don't communicate by sharing memory; share memory by communicating."

Selamat belajar concurrency di Go!

Last updated