A Simple Queue Implementation in Golang with channels
12 Jan 2021

A Simple Queue Implementation in Golang with channels

In this post, I’m going to show the way how we can implement a simple queue in Golang, using channels.

Let’s clarify why would we ever use a queue? There many possible reasons. The most common one it’s when we have a list of operations (actions) and we need to process them one by one. This list either could be static or dynamic (when new items arriving continuously). As in the real-life, queue needed to be processed by something or someone. Imagine, a line of people in the supermarket nearby cashier.

Every customer in this line holds his own specific products and wants to pay for them. In other words, every queue item has it own set of data with instructions. A cashier serves his clients from a line one by one (processes queue items). What would happen if all customers decided to pay simultaneously? Probably, something like this:

To prevent this situation, let’s implement a queue in Go quickly :) First, we need to create a Queue struct and constructor function:

  // Queue holds name, list of jobs and context with cancel.
  type Queue struct {
     name   string
     jobs   chan Job
     ctx    context.Context
     cancel context.CancelFunc
  }

  // Job - holds logic to perform some operations during queue execution.
  type Job struct {
    Name   string
    Action func() error // A function that should be executed when the job is running.
  }

  // NewQueue instantiates new queue.
  func NewQueue(name string) *Queue {
    ctx, cancel := context.WithCancel(context.Background())

    return &Queue{
       jobs:   make(chan Job),
       name:   name,
       ctx:    ctx,
       cancel: cancel,
    }
  }

As you have seen above, I’ve declared an unbuffered channel (with no capacity) to hold Jobs in a Queue. Channels will give us a really powerful possibility to work in a concurrent environment. Moreover, our queue has a context with cancellation. This will help us to understand when all jobs in the queue were finished in all goroutines and after that, we can free all resources.

In the code below we are going to add some methods to the queue struct.

// AddJobs adds jobs to the queue and cancels channel.
func (q *Queue) AddJobs(jobs []Job) {
  var wg sync.WaitGroup
  wg.Add(len(jobs))

  for _, job := range jobs {
    // Goroutine which adds job to the queue.
    go func(job Job) {
      q.AddJob(job)
      wg.Done()
    }(job)
  }

  go func() {
    wg.Wait()
    // Cancel queue channel, when all goroutines were done.
    q.cancel()
  }()
}

// AddJob sends job to the channel.
func (q *Queue) AddJob(job Job) {
  q.jobs <- job
  log.Printf("New job %s added to %s queue", job.Name, q.name)
}

Next, let’s add a Run() method to the Job struct, which should execute a job’s action (the main purpose of the job).

// Run performs job execution.
func (j Job) Run() error {
  log.Printf("Job running: %s", j.GetName())

  err := j.Action()
  if err != nil {
    return err
  }

  return nil
}

Alright, now let’s create a worker, who will be responsible for serving our queue.

// Worker responsible for queue serving.
type Worker struct {
  Queue *Queue
}

// NewWorker initializes a new Worker.
func NewWorker(queue *Queue) *Worker {
  return &Worker{
    Queue: queue,
  }
}

// DoWork processes jobs from the queue (jobs channel).
func (w *Worker) DoWork() bool {
  for {
    select {
      // if context was canceled.
      case <-w.Queue.ctx.Done():
        log.Printf("Work done in queue %s: %s!", w.Queue.name, w.Queue.ctx.Err())
        return true
      // if job received.
      case job := <-w.Queue.jobs:
        err := job.Run()
        if err != nil {
          log.Print(err)
          continue
        }
    }
  }
}

Pay attention to the DoWork() method. There we have a for loop, which is listening for the jobs channel, and if the channel receives a job, it executes it, by running job.Run(). If the context was canceled, it means, that all jobs were executed and we can exit from the loop.

Now, let’s see how we can work with just created queue functionality on a real example. I’m going to create a simple program, where we need to import new products into our storage.

package main

import (
	"fmt"
	"github.com/alexsergivan/blog-examples/queue/queue"
)

// Our products storage.
var products = []string{
	"books",
	"computers",
}

func main()  {
    // New products, which we need to add to our products storage.
	newProducts := []string{
		"apples",
		"oranges",
		"wine",
		"bread",
		"orange juice",
	}
    // New queue initialization.
    productsQueue := queue.NewQueue("NewProducts")
	var jobs []queue.Job

    // Range over new products.
	for _, newProduct := range newProducts {
        // We need to do this, because variables declared in for loops are passed by reference.
        // Otherwise, our closure will always receive the last item from the newProducts.
        product := newProduct
        // Defining of the closure, where we add a new product to our simple storage (products slice)
		action := func() error {
			products = append(products, product)
			return nil
        }
        // Append job to jobs slice.
		jobs = append(jobs, queue.Job{
			Name:   fmt.Sprintf("Importing new product: %s", newProduct),
			Action: action,
		})
	}

    // Adds jobs to the queue.
	productsQueue.AddJobs(jobs)

    // Defines a queue worker, which will execute our queue.
    worker := queue.NewWorker(productsQueue)
    // Execute jobs in queue.
    worker.DoWork()

    // Prints products storage after queue execution.
	defer fmt.Print(products)
}

After executing this program, it will print out this data:

2021/01/12 22:10:33 New job Importing new product: apples added to NewProducts queue
2021/01/12 22:10:33 Job running: Importing new product: apples
2021/01/12 22:10:33 Job running: Importing new product: wine
2021/01/12 22:10:33 Job running: Importing new product: oranges
2021/01/12 22:10:33 Job running: Importing new product: bread
2021/01/12 22:10:33 Job running: Importing new product: orange juice
2021/01/12 22:10:33 New job Importing new product: orange juice added to NewProducts queue
2021/01/12 22:10:33 New job Importing new product: oranges added to NewProducts queue
2021/01/12 22:10:33 New job Importing new product: bread added to NewProducts queue
2021/01/12 22:10:33 New job Importing new product: wine added to NewProducts queue
2021/01/12 22:10:33 Work done in queue NewProducts: context canceled!
[books computers apples wine oranges bread orange juice]%

Please notice the last line in the logs: [books computers apples wine oranges bread orange juice] As you can see, the order is different from the order in newProducts slice. It happens because of goroutines nature. Sometimes one goroutine needs more time to finish its work than another. For the scenario, when the order is important, I will write a separate post later.

For now, that’s it :) Hope it was helpful!

The source code you can find here

Related posts

How to Sort Strings With Go Alphabetically in Any Language
4 Jan 2021

How to Sort Strings With Go Alphabetically in Any Language

In this article I’m going to show how easy we can sort strings alphabetically in different languages, using Go. It seems like an easy task if we want to sort English words, however, it’s not so trivial if we want to sort correctly strings with special characters or in other languages, i.e Cyrillic based.

go sorting alphabetical sort
How to Control Router Access Permissions in Go Web Apps
23 Dec 2020

How to Control Router Access Permissions in Go Web Apps

In this post I’m going to describe how can we limit user access to the specific url in golang web application. I will use chi router - a lightweight, idiomatic and composable router for building Go HTTP services.

go router chi
How to test database interactions in golang applications
22 Dec 2020

How to test database interactions in golang applications

Testing of functions with database interactions always was challenging. Recently, I found a wonderful library go-sqlmock which will simplify writing tests and mocking database queries in golang applications a lot.

go testing sql
Go middleware example. How to alter a handler result
20 Dec 2020

Go middleware example. How to alter a handler result

Let’s imagine a situation when you want to alter the result, returned by some http handler to the client. Fortunately, Golang provides an easy mechanism for that, called a middleware. I’m going to dive directly to the source code, to save your time.

go middleware