A Simple Queue Implementation in Golang with channels
12 Jan 2021

A Simple Queue Implementation in Golang with channels

In this post, I’m going to show the way how we can implement a simple queue in Golang, using channels.

Let’s clarify why would we ever use a queue? There many possible reasons. The most common one it’s when we have a list of operations (actions) and we need to process them one by one. This list either could be static or dynamic (when new items arriving continuously). As in the real-life, queue needed to be processed by something or someone. Imagine, a line of people in the supermarket nearby cashier.

Every customer in this line holds his own specific products and wants to pay for them. In other words, every queue item has it own set of data with instructions. A cashier serves his clients from a line one by one (processes queue items). What would happen if all customers decided to pay simultaneously? Probably, something like this:

To prevent this situation, let’s implement a queue in Go quickly :) First, we need to create a Queue struct and constructor function:

  // Queue holds name, list of jobs and context with cancel.
  type Queue struct {
     name   string
     jobs   chan Job
     ctx    context.Context
     cancel context.CancelFunc
  }

  // Job - holds logic to perform some operations during queue execution.
  type Job struct {
    Name   string
    Action func() error // A function that should be executed when the job is running.
  }

  // NewQueue instantiates new queue.
  func NewQueue(name string) *Queue {
    ctx, cancel := context.WithCancel(context.Background())

    return &Queue{
       jobs:   make(chan Job),
       name:   name,
       ctx:    ctx,
       cancel: cancel,
    }
  }

As you have seen above, I’ve declared an unbuffered channel (with no capacity) to hold Jobs in a Queue. Channels will give us a really powerful possibility to work in a concurrent environment. Moreover, our queue has a context with cancellation. This will help us to understand when all jobs in the queue were finished in all goroutines and after that, we can free all resources.

In the code below we are going to add some methods to the queue struct.

// AddJobs adds jobs to the queue and cancels channel.
func (q *Queue) AddJobs(jobs []Job) {
  var wg sync.WaitGroup
  wg.Add(len(jobs))

  for _, job := range jobs {
    // Goroutine which adds job to the queue.
    go func(job Job) {
      q.AddJob(job)
      wg.Done()
    }(job)
  }

  go func() {
    wg.Wait()
    // Cancel queue channel, when all goroutines were done.
    q.cancel()
  }()
}

// AddJob sends job to the channel.
func (q *Queue) AddJob(job Job) {
  q.jobs <- job
  log.Printf("New job %s added to %s queue", job.Name, q.name)
}

Next, let’s add a Run() method to the Job struct, which should execute a job’s action (the main purpose of the job).

// Run performs job execution.
func (j Job) Run() error {
  log.Printf("Job running: %s", j.GetName())

  err := j.Action()
  if err != nil {
    return err
  }

  return nil
}

Alright, now let’s create a worker, who will be responsible for serving our queue.

// Worker responsible for queue serving.
type Worker struct {
  Queue *Queue
}

// NewWorker initializes a new Worker.
func NewWorker(queue *Queue) *Worker {
  return &Worker{
    Queue: queue,
  }
}

// DoWork processes jobs from the queue (jobs channel).
func (w *Worker) DoWork() bool {
  for {
    select {
      // if context was canceled.
      case <-w.Queue.ctx.Done():
        log.Printf("Work done in queue %s: %s!", w.Queue.name, w.Queue.ctx.Err())
        return true
      // if job received.
      case job := <-w.Queue.jobs:
        err := job.Run()
        if err != nil {
          log.Print(err)
          continue
        }
    }
  }
}

Pay attention to the DoWork() method. There we have a for loop, which is listening for the jobs channel, and if the channel receives a job, it executes it, by running job.Run(). If the context was canceled, it means, that all jobs were executed and we can exit from the loop.

Now, let’s see how we can work with just created queue functionality on a real example. I’m going to create a simple program, where we need to import new products into our storage.

package main

import (
	"fmt"
	"github.com/alexsergivan/blog-examples/queue/queue"
)

// Our products storage.
var products = []string{
	"books",
	"computers",
}

func main()  {
    // New products, which we need to add to our products storage.
	newProducts := []string{
		"apples",
		"oranges",
		"wine",
		"bread",
		"orange juice",
	}
    // New queue initialization.
    productsQueue := queue.NewQueue("NewProducts")
	var jobs []queue.Job

    // Range over new products.
	for _, newProduct := range newProducts {
        // We need to do this, because variables declared in for loops are passed by reference.
        // Otherwise, our closure will always receive the last item from the newProducts.
        product := newProduct
        // Defining of the closure, where we add a new product to our simple storage (products slice)
		action := func() error {
			products = append(products, product)
			return nil
        }
        // Append job to jobs slice.
		jobs = append(jobs, queue.Job{
			Name:   fmt.Sprintf("Importing new product: %s", newProduct),
			Action: action,
		})
	}

    // Adds jobs to the queue.
	productsQueue.AddJobs(jobs)

    // Defines a queue worker, which will execute our queue.
    worker := queue.NewWorker(productsQueue)
    // Execute jobs in queue.
    worker.DoWork()

    // Prints products storage after queue execution.
	defer fmt.Print(products)
}

After executing this program, it will print out this data:

2021/01/12 22:10:33 New job Importing new product: apples added to NewProducts queue
2021/01/12 22:10:33 Job running: Importing new product: apples
2021/01/12 22:10:33 Job running: Importing new product: wine
2021/01/12 22:10:33 Job running: Importing new product: oranges
2021/01/12 22:10:33 Job running: Importing new product: bread
2021/01/12 22:10:33 Job running: Importing new product: orange juice
2021/01/12 22:10:33 New job Importing new product: orange juice added to NewProducts queue
2021/01/12 22:10:33 New job Importing new product: oranges added to NewProducts queue
2021/01/12 22:10:33 New job Importing new product: bread added to NewProducts queue
2021/01/12 22:10:33 New job Importing new product: wine added to NewProducts queue
2021/01/12 22:10:33 Work done in queue NewProducts: context canceled!
[books computers apples wine oranges bread orange juice]%

Please notice the last line in the logs: [books computers apples wine oranges bread orange juice] As you can see, the order is different from the order in newProducts slice. It happens because of goroutines nature. Sometimes one goroutine needs more time to finish its work than another. For the scenario, when the order is important, I will write a separate post later.

For now, that’s it :) Hope it was helpful!

The source code you can find here

Related posts

One of The Easiest Ways to Host your Go Web App
5 Sep 2023

One of The Easiest Ways to Host your Go Web App

Hello! In this post, I will explain the cost-effective method I use to host my Go web applications with varying levels of complexity, all starting from as low as $5 per month. This method also allows to easy deploy and scale your golang application.

go hosting digitalocean docker
103 Early Hints in Go, or the new Way of How to Improve Performance of a Web Page written in Go
14 Nov 2022

103 Early Hints in Go, or the new Way of How to Improve Performance of a Web Page written in Go

Since Go 1.19 we can use a new 103 (Early Hints) http status code when we create web applications. Let’s figure out how and when this could help us. We are going to create a simple golang web server that servers some html content. One html page will be served with 103 header and another one without. After loading comparison we will see how early hints can improve page performance.

go golang performance
Example of how Golang generics minimize the amount of code you need to write
9 Jun 2022

Example of how Golang generics minimize the amount of code you need to write

I guess that almost everyone in the go community was exciting when Go 1.18 was released, especially because of generics. Some days ago I decided to try generics in the real-world application, by refactoring some of its pieces, related to a caching logic.

go generics redis cache
Concurrent Map Writing and Reading in Go, or how to deal with the data races.
16 Jul 2021

Concurrent Map Writing and Reading in Go, or how to deal with the data races.

This time, I will show you how to work with the maps in go effectively and prevent the occurrence of the data race errors. Data races happen when several goroutines access the same resource concurrently and at least one of the accesses is a write.

go concurrent map data race
Ristretto - the Most Performant Concurrent Cache Library for Go
2 Mar 2021

Ristretto - the Most Performant Concurrent Cache Library for Go

Recently, I discovered a surprisingly reliable memory caching solution, which I’m planning to use in all my further applications to increase performance. In this blog post, I will share some code examples of how you can integrate Ristretto caching library into your application.

go caching ristretto performance