Scalatomic!

A Reasonable Interface for Atomic Variables

Posted on April 26, 2018 by Romain Edelmann

Concurrency is hard, as Computer Science students at EPFL quickly discover as they are first introduced to the concept in the Parallelism and Concurrency course. The course, which takes place during the fourth Bachelor semester, uses Scala for all its code examples, lecture material, exercises and assignments and introduces several topics related to parallelism and to concurrency. I’ve had the chance to be a teaching assistant for this course for several years now. This post has been heavily influenced by the interactions I’ve had with students of the course over the years. In this post, I will discuss one particular subject of the course – namely atomic variables – and how to design a higher level interface that eliminates many of the traps students encounter while using atomic variables. Concurrency is hard, yes, but let’s not make it harder!

Atomic Variables

One topic covered by the Parallelism and Concurrency course is atomic operations. In the time of a lecture and a programming assignment, students learn to use the Java class AtomicReference and some of its methods, in particular get and compareAndSet.

Written in Scala, the AtomicReference class would look something like this:

class AtomicReference[A] {
  def get(): A = // ...
  def compareAndSet(expect: A, update: A): Boolean = // ...
  // ...
}

The method get atomically reads the value stored in the atomic variable, while the method compareAndSet is used to atomically update the variable. The method compareAndSet takes two arguments: (1) the expected value of variable and (2) the desired new value.

Importantly, compareAndSet will refuse to update the variable if its first argument doesn’t correspond to the value currently held by the variable. This allows programs to properly detect and handle stale reads due to concurrent modifications. As the method may fail, it returns a boolean indicating whether or not the update happened.

As an example, here is how one could, using atomic primitives, write a procedure to withdraw money from a bank account:

import scala.annotation.tailrec
import java.util.concurrent.atomic.AtomicReference

@tailrec
def withdraw(account: AtomicReference[Int], amount: Int): Boolean = {
  val balance = account.get()
  if (amount <= balance) {
    val updatedBalance = balance - amount
    if (account.compareAndSet(balance, updatedBalance)) {
      true  // Withdraw successful.
    }
    else {
      withdraw(account, amount)  // Stale read, let's retry.
    }
  }
  else {
    false  // Not enough money on the account.
  }
}

The Pattern

Procedures that manipulate atomic variables, such as the withdraw function above, generally follow the same pattern:

  • The atomic variables are read and their value stored in local variables,
  • Some actions are taken based on the local variables, possibly resulting in updated values for the variables,
  • The atomic variables are updated using compareAndSet, using the values read as expected values,
  • In case a call to compareAndSet fails, extra actions are taken (such as, for instance, restarting the procedure).

However, this pattern is not enforced – or even encouraged! – by the low level API provided by the java.util.concurrent.atomic package.

The Errors

In my experience, students using atomic variables often make the same kind of mistakes:

  • Instead of reading atomic variables once and storing the results in local variables, students often read the atomic variables multiple times, leading to consistency problems,
  • They often improperly call compareAndSet, often times using a “fresh” read of the variable as the expected value,
  • Finally, they often forget to handle the result of compareAndSet.

All those errors can be easily prevented by following the programming pattern previously described. From this observation, I designed a Scala library that offers an interface based on this pattern. Introducing…

Scalatomic

Scalatomic is an open source Scala library designed to make the use of atomic variables less error prone. The library is available on GitHub. The library introduces the type Atomic[A], which represents an atomically modifiable variable of type A. Values of type Atomic[A] can be created as follows:

val atomicVar: Atomic[Int] = newAtomic(42 /* Initial value */)

The only argument of newAtomic is the initial value stored in the variable.

Values stored in atomic variables can be read using the read method:

val currentValue: Int = atomicVar.read

The modify Method

The main way one interacts with an Atomic[A] is through its modify method. modify takes as its argument a single high order function. The current value of the atomic variable is passed to the parameter function, which in turn should return an updated value of type A and a result value of some type R.

The method modify returns values of type Result[R], which either represent an actual value of type R, or a failure due to concurrent modifications. The method onFailure should be called on the result of modify to handle those potential failures. Together, the methods modify and onFailure precisely implement the pattern we have just discussed.

As an example, here is how one could write the withdraw procedure using scalatomic:

import scalatomic._

def withdraw(account: Atomic[Int], amount: Int): Boolean =
  account modify { (balance: Int) =>
    //                 ^ The current balance is received as an argument.
    if (amount <= balance) {
      // We issue a request to update the variable, with a `true` result.
      Update(balance - amount) withResult true
    }
    else {
      // We do not update the variable, and want to have `false` as result.
      NoUpdate withResult false
    }
  } onFailure {
    // In case of a concurrent modification, we wish to retry.
    retry
  }

The modify / onFailure operation has been designed to be less error prone than primitive atomic operations:

  • As the current value of the atomic variable is provided by modify to its parameter function, users of the library do not have to manually read the variable. The value read from the atomic variable is directly stored as a local variable, which users can easily access. This makes it less likely that users perform multiple incoherent reads.
  • Programmers also don’t need to manually call compareAndSet and pass it the correct arguments, as the library handles this for them.
  • The Scala type system ensures that onFailure is called, since the result of modify is wrapped in the Result class1, making it impossible to forget to handle concurrent modifications.

Automatic Versioning

To prevent the ABA problem, scalatomic can tag values stored in atomic variables with a version number. This version tagging can be enabled during the creation of the atomic variable:

// Creates a versioned atomic variable.
val atomicVar: VersionedAtomic[Int] = newVersionedAtomic(42)

Versioned atomic variables support the following additional methods:

  • readWithVersion, which reads the value stored and its version number,
  • readVersion, which only reads the version number,
  • modifyWithVersion, which also provides the version number to the parameter high order function.

Restarting

Generally, procedures that use atomic variables usually restart from a specific point when an atomic operation fails. To make this easy, scalatomic introduces retry, restartable and restart.

  • The function retry can be used to immediately retry a modify operation from the start. It should only be called within modify or its associated onFailure.
  • The function restartable delimits a restartable block.
  • The function restart immediately restarts the closest restartable block.

Applicability

To evaluate the applicability of the library to real word problems, I have written, using scalatomic, various lock-free algorithms based on compareAndSet. Below is an implementation of lock-free queues, adapted from the paper Simple, fast, and practical non-blocking and blocking concurrent queue algorithms2 by Maged M. Michael and Michael L. Scott. Is is arguably simpler to understand that the original version, and was relatively easy to write.

sealed abstract class Node[A] {
  val nextPointer: VersionedAtomic[Option[Node[A]]]
  def value: A
}

class ValueNode[A](override val value: A, nextNode: Option[Node[A]]) extends Node[A] {
  override val nextPointer = newVersionedAtomic(nextNode)
}

class EmptyNode[A]() extends Node[A] {
  override val nextPointer = newVersionedAtomic[Option[Node[A]]](None)
  override def value = throw new UnsupportedOperationException("No value for empty nodes.")
}

class LockFreeQueue[A] {

  // Head and last initially point to the same (sentinel) node.
  private val (head, last) = {
    val sentinelNode: Node[A] = new EmptyNode()
    (newVersionedAtomic(sentinelNode), newVersionedAtomic(sentinelNode))
  }

  def toSeq: Seq[A] = {
    @tailrec
    def nodesToSeq(node: Node[A], acc: Seq[A]): Seq[A] = {
      val newAcc: Seq[A] = acc :+ node.value
      node.nextPointer.read match {
        case Some(nextNode) => nodesToSeq(nextNode, newAcc)
        case None => newAcc
      }
    }

    head.read.nextPointer.read match {
      case None => Seq()
      case Some(firstNode) => nodesToSeq(firstNode, Seq())
    }
  }

  def enqueue(value: A): Unit = {
    val newNode = new ValueNode(value, None)
    restartable {
      val successful = last modifyWithVersion { (lastNode, lastVersion) =>
        val (candidateLast, successful) = lastNode.nextPointer modify { (optNextNode) =>

          if (lastVersion != last.readVersion) {
            // Possible inconsistent read.
            restart
          }

          if (optNextNode.nonEmpty) {
            // We don't modify the next value of the last node,
            // but simply indicate that we might want to update it.
            // We also indicate that the insertion was not successful.
            NoUpdate withResult (optNextNode.get, false)
          }
          else {
            // We try to modify the next value of the last node.
            // We also return that we want to update the reference of last to it,
            // and that the operation is already successful.
            Update(Some(newNode)) withResult (newNode, true)
          }
        } onFailure {
          // We were not able to modify any state at all, let's retry.
          restart
        }

        Update(candidateLast) withResult successful
      } onFailureWithResult { case successful =>
        // We simply indicate if we were successful or not in the insertion.
        successful
      }

      if (!successful) {
        restart
      }
    }
  }

  def dequeue(): Option[A] = restartable {
    head modifyWithVersion { (headNode, headVersion) =>
      val optNext = last modify { (lastNode) =>

        // Read the node pointed of the head node.
        val next = headNode.nextPointer.read

        if (head.readVersion != headVersion) {
          // Inconsistent reads possible.
          restart
        }

        if (headNode eq lastNode) {
          // Either queue is empty or tail lagging.
          if (next.isEmpty) {
            // Queue is empty.
            // Abort all modifications and directly return.
            return None
          }
          else {
            // Last is lagging behind.
            Update(next.get) withResult None
          }
        }
        else {
          NoUpdate withResult next
        }
      } onFailure {
        None
      }

      if (optNext.isEmpty) {
        restart
      }

      Update(optNext.get) withResult Some(optNext.get.value)
    } onFailure {
      retry
    }
  }
}

Comparison to STM

The library might remind you3 of Software Transactional Memory (STM), which is the underlying concept behind libraries such as ScalaSTM. While they appear similar on the surface, there are fundamental differences:

  • STM is a higher level abstraction based on the idea of transactions. In STM systems, transactions are automatically rollbacked and retried in case of concurrent modifications. For this reason, code executed in a transaction should have limited side effects.
  • Scalatomic is a very lightweight interface to atomic variable. The concept of a transaction is not present in Scalatomic. In case of concurrent modifications, it is the responsability of users to specify what should be done. This flexibility compared to STM is useful for implementing some algorithms and data structures, such as the lock free queue example presented in this article.

Conclusion

In this post, I have presented the Scalatomic library, which aims to provide a reasonable interface for manipulating atomic variables. The library was motivated by the mistakes I have witnessed students make in a Bachelor course on concurrency. Those mistakes can generally be attributed to deviation from a specific pattern. Scalatomic introduces the methods modify and onFailure, amongst others, which closely follow the identified pattern. I have shown that the library can be used to implement non-trivial lock-free data structures and algorithms.


  1. Unfortunately, this is only true for non-Unit result, as Scala silently discards values of other types when it expects Unit. The flag -Ywarn-value-discard can be used to instruct the scala compiler to emit warnings in this case.↩︎

  2. Michael, M. M., & Scott, M. L. (1996, May). Simple, fast, and practical non-blocking and blocking concurrent queue algorithms. In Proceedings of the fifteenth annual ACM symposium on Principles of distributed computing (pp. 267-275). ACM.↩︎

  3. Thanks to sciss_ for raising this relevant point during the Reddit discussion.↩︎

Comments

This post has been discussed on Reddit.