In this post, I’m going to show the way how we can implement a simple queue in Golang, using channels.
Let’s clarify why would we ever use a queue? There many possible reasons. The most common one it’s when we have a list of operations (actions) and we need to process them one by one. This list either could be static or dynamic (when new items arriving continuously). As in the real-life, queue needed to be processed by something or someone. Imagine, a line of people in the supermarket nearby cashier.
Every customer in this line holds his own specific products and wants to pay for them. In other words, every queue item has it own set of data with instructions. A cashier serves his clients from a line one by one (processes queue items). What would happen if all customers decided to pay simultaneously? Probably, something like this:
To prevent this situation, let’s implement a queue in Go quickly :) First, we need to create a Queue struct and constructor function:
// Queue holds name, list of jobs and context with cancel.
type Queue struct {
name string
jobs chan Job
ctx context.Context
cancel context.CancelFunc
}
// Job - holds logic to perform some operations during queue execution.
type Job struct {
Name string
Action func() error // A function that should be executed when the job is running.
}
// NewQueue instantiates new queue.
func NewQueue(name string) *Queue {
ctx, cancel := context.WithCancel(context.Background())
return &Queue{
jobs: make(chan Job),
name: name,
ctx: ctx,
cancel: cancel,
}
}
As you have seen above, I’ve declared an unbuffered channel (with no capacity) to hold Jobs in a Queue. Channels will give us a really powerful possibility to work in a concurrent environment. Moreover, our queue has a context with cancellation. This will help us to understand when all jobs in the queue were finished in all goroutines and after that, we can free all resources.
In the code below we are going to add some methods to the queue
struct.
// AddJobs adds jobs to the queue and cancels channel.
func (q *Queue) AddJobs(jobs []Job) {
var wg sync.WaitGroup
wg.Add(len(jobs))
for _, job := range jobs {
// Goroutine which adds job to the queue.
go func(job Job) {
q.AddJob(job)
wg.Done()
}(job)
}
go func() {
wg.Wait()
// Cancel queue channel, when all goroutines were done.
q.cancel()
}()
}
// AddJob sends job to the channel.
func (q *Queue) AddJob(job Job) {
q.jobs <- job
log.Printf("New job %s added to %s queue", job.Name, q.name)
}
Next, let’s add a Run()
method to the Job
struct, which should execute a job’s action (the main purpose of the job).
// Run performs job execution.
func (j Job) Run() error {
log.Printf("Job running: %s", j.GetName())
err := j.Action()
if err != nil {
return err
}
return nil
}
Alright, now let’s create a worker, who will be responsible for serving our queue.
// Worker responsible for queue serving.
type Worker struct {
Queue *Queue
}
// NewWorker initializes a new Worker.
func NewWorker(queue *Queue) *Worker {
return &Worker{
Queue: queue,
}
}
// DoWork processes jobs from the queue (jobs channel).
func (w *Worker) DoWork() bool {
for {
select {
// if context was canceled.
case <-w.Queue.ctx.Done():
log.Printf("Work done in queue %s: %s!", w.Queue.name, w.Queue.ctx.Err())
return true
// if job received.
case job := <-w.Queue.jobs:
err := job.Run()
if err != nil {
log.Print(err)
continue
}
}
}
}
Pay attention to the DoWork()
method. There we have a for loop, which is listening for the jobs channel,
and if the channel receives a job, it executes it, by running job.Run()
. If the context was canceled, it means,
that all jobs were executed and we can exit from the loop.
Now, let’s see how we can work with just created queue functionality on a real example. I’m going to create a simple program, where we need to import new products into our storage.
package main
import (
"fmt"
"github.com/alexsergivan/blog-examples/queue/queue"
)
// Our products storage.
var products = []string{
"books",
"computers",
}
func main() {
// New products, which we need to add to our products storage.
newProducts := []string{
"apples",
"oranges",
"wine",
"bread",
"orange juice",
}
// New queue initialization.
productsQueue := queue.NewQueue("NewProducts")
var jobs []queue.Job
// Range over new products.
for _, newProduct := range newProducts {
// We need to do this, because variables declared in for loops are passed by reference.
// Otherwise, our closure will always receive the last item from the newProducts.
product := newProduct
// Defining of the closure, where we add a new product to our simple storage (products slice)
action := func() error {
products = append(products, product)
return nil
}
// Append job to jobs slice.
jobs = append(jobs, queue.Job{
Name: fmt.Sprintf("Importing new product: %s", newProduct),
Action: action,
})
}
// Adds jobs to the queue.
productsQueue.AddJobs(jobs)
// Defines a queue worker, which will execute our queue.
worker := queue.NewWorker(productsQueue)
// Execute jobs in queue.
worker.DoWork()
// Prints products storage after queue execution.
defer fmt.Print(products)
}
After executing this program, it will print out this data:
2021/01/12 22:10:33 New job Importing new product: apples added to NewProducts queue
2021/01/12 22:10:33 Job running: Importing new product: apples
2021/01/12 22:10:33 Job running: Importing new product: wine
2021/01/12 22:10:33 Job running: Importing new product: oranges
2021/01/12 22:10:33 Job running: Importing new product: bread
2021/01/12 22:10:33 Job running: Importing new product: orange juice
2021/01/12 22:10:33 New job Importing new product: orange juice added to NewProducts queue
2021/01/12 22:10:33 New job Importing new product: oranges added to NewProducts queue
2021/01/12 22:10:33 New job Importing new product: bread added to NewProducts queue
2021/01/12 22:10:33 New job Importing new product: wine added to NewProducts queue
2021/01/12 22:10:33 Work done in queue NewProducts: context canceled!
[books computers apples wine oranges bread orange juice]%
Please notice the last line in the logs:
[books computers apples wine oranges bread orange juice]
As you can see, the order is different from the order in newProducts
slice. It happens because of goroutines nature.
Sometimes one goroutine needs more time to finish its work than another. For the scenario, when the order is important,
I will write a separate post later.
For now, that’s it :) Hope it was helpful!
The source code you can find here
Hello! In this post, I will explain the cost-effective method I use to host my Go web applications with varying levels of complexity, all starting from as low as $5 per month. This method also allows to easy deploy and scale your golang application.
go hosting digitalocean dockerSince Go 1.19 we can use a new 103 (Early Hints)
http status code when we create web applications. Let’s figure out how and when this could help us.
We are going to create a simple golang web server that servers some html content. One html page will be served with 103
header and another one without.
After loading comparison we will see how early hints can improve page performance.
I guess that almost everyone in the go community was exciting when Go 1.18 was released, especially because of generics. Some days ago I decided to try generics in the real-world application, by refactoring some of its pieces, related to a caching logic.
go generics redis cacheThis time, I will show you how to work with the maps in go effectively and prevent the occurrence of the data race errors. Data races happen when several goroutines access the same resource concurrently and at least one of the accesses is a write.
go concurrent map data raceRecently, I discovered a surprisingly reliable memory caching solution, which I’m planning to use in all my further applications to increase performance. In this blog post, I will share some code examples of how you can integrate Ristretto caching library into your application.
go caching ristretto performance