This post takes the minimal Scala wrapper for UIMA from the previous post and extends it to make use of UIMA Asynchronous Scaleout (UIMA-AS), an extension to UIMA that allows for multicore/distributed analyses. UIMA-AS requires a lot of XML configuration files, which we can generate programmatically for convenience.

UIMA-AS has some similar affordances to uimaFIT with regard to programmatic configuration, but we can no longer make use of many of the convenience methods (including iteratePipeline) offered by uimaFIT. No matter, we’ll write our own!

UIMA App Context

The most basic runtime configuration is stored in a UIMA Application Context, which in the UIMA-AS Java examples is represented as a java.util.Map<String, Object>. For our wrapper, we can create a case class containing all the necessary information with richer types than the original, and a method toMap that will translate it into the Java form.

import org.apache.uima.resourceSpecifier.factory.SerializationStrategy

case class UimaAppContext(
  dd2SpringXsltFilePath: URL = getClass.getClassLoader.getResource("dd2spring.xsl"),
  saxonClasspath: URL = getClass.getClassLoader.getResource("saxon8.jar"),
  serverUri: URI = new URI("tcp://localhost:61616"),
  endpoint: String = "uimaAS",
  casPoolSize: Int = 2,
  casInitialHeapSize: Int = 500000, // number of 4-byte words = 2000000 bytes (2 MB)
  applicationName: String = "",
  getMetaTimeout: Int = 60000,
  timeout: Option[Int] = None,
  cpcTimeout: Option[Int] = None,
  serializationStrategy: SerializationStrategy = SerializationStrategy.binary
) {
  def toMap: java.util.Map[String, Object] = ???
}

object UimaAppContext {
  implicit def asMap(appCtx: UimaAppContext): java.util.Map[String, Object] = appCtx.toMap
}

The Option[Int] fields will be omitted from the config map if they are None, signifying an unlimited timeout. All timeouts are specified in milliseconds.

Most of these fields can be left as their defaults. Perhaps the most important is casPoolSize, which determines how many documents will be processed concurrently.

Deployment Configuration

Using the UimaAppContext as an input, we can now create a case class for the complete deployment configuration. Most of its settings come from the UimaAppContext, while some others must be specified in addition.

case class UimaAsyncDeploymentConfig(
    engineDescs: Seq[AnalysisEngineDescription],
    appCtx: UimaAppContext = UimaAppContext(),
    name: String = "uima-service",
    description: String = "",
    descriptor: String = "",
    async: Boolean = true) {
  def toXML(): String = ???
}

Because UIMA-AS requires the configuration files to exist on disk, the toXML method should return a file path to where it has written its configuration (to a temp file, in most cases).

How to Build an Analysis Pipeline

The real benefits of UIMA-AS become apparent when some operations can run in parallel, but specifying the flow can be somewhat complicated. For now, we’ll reuse the Process abstraction from the prior post, which simply stores a sequence of AnalysisEngineDescriptions to be run one after another.

One unfortunate consequence of switching to asynchronous processing in UIMA is that the convenient Iterator[JCas] interface offered by uimaFIT is no longer usable. By the nature of UIMA-AS, a callback will receive completed CASes in an arbitrary order. Further, just as in synchronous UIMA processing, the results of an analysis are only available while looping over the iterator, since the CAS object is reused to store each document in turn. Therefore, we need to change our interface so that we specify how to collect our desired results at call time, rather than getting an Iterator to be used later.

To resolve this issue, we will pass around the means to collect and process the results of each analysis as a function from JCas => T. In practice, we will often want to know which document the result came from, so we ultimately return a Map[String, T] from our analysis functions (where the String is a unique identifier, drawn from the document title in the JCas metadata).

Managing the UIMA-AS Lifecycle

Since UIMA-AS is designed to delegate its analyses to remote machines, a bit of ceremony is needed to start up and shut down its constituent services even when we’re only running multiple engines on one machine. We’ll define a UimaAsync class to manage the process, and store an app-wide broker connection in the companion object.

import org.apache.uima.aae.client.UimaAsynchronousEngine
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl

class UimaAsync(val config: UimaAppContext = UimaAppContext()) {
  import uimaAS.UimaAppContext._

  val engine: UimaAsynchronousEngine = new BaseUIMAAsynchronousEngine_impl
  var springContainerId: Option[String] = None

  def start[T](corpus: Corpus, process: Process, block: Util.Block[T], collectionTotal: Option[Int] = None): Future[Util.Results[T]] = ???
  
  def stop(): Unit = ???
}

object UimaAsync {
  val uri = UimaAppContext().serverUri
  var broker: Option[BrokerService] = None

  def start(): Unit = {
    if (broker.isEmpty) {
      val brokerService = new BrokerService
      brokerService.setBrokerName("localhost")
      brokerService.setUseJmx(false)
      brokerService.addConnector(uri.toString)
      brokerService.start()
      broker = Some(brokerService)
    }
  }

  def stop(): Unit = broker.foreach { _.stop() }
}

Running an Analysis

The other important piece is a callback listener (to collect the results that are to be returned, and to confirm that all documents have been processed).

With all these elements in place, we can finally run an analysis!

First, set up the corpus and the analysis pipeline:

val corpus = Corpus.fromDir("dir_with_txt_files")
val lemmatize =
  Process(
    createEngineDescription(classOf[ClearNlpSegmenter]),
    createEngineDescription(classOf[ClearNlpPosTagger]),
    createEngineDescription(classOf[ClearNlpLemmatizer]))

And then run the pipeline asynchronously:


UimaAsync.start() // start the ActiveMQ broker
val uimaAS = new UimaAsync()

val getLemmas = { jcas => 
  jcas.select(classOf[Lemma]).take(5).map(_.getValue).toVector
}

val results: Future[Map[String, Vector[String]]] = uimaAS.start(corpus, Process.lemmatize, Process.wrapBlock(getLemmas))

results onSuccess { allLemmas =>
  allLemmas("1789-Washington.txt") shouldBe Vector("fellow", "-", "citizen", "of", "the")
}

Note that a blocking runMultiThread method has been added on the Process class for convenience; with this, the usage becomes:

val allLemmas = lemmatize.runMultiThread(corpus) { jcas =>
  jcas.select(classOf[Lemma]).take(5).map(_.getValue).toVector
}

allLemmas("1789-Washington.txt") shouldBe Vector("fellow", "-", "citizen", "of", "the")

Complete demo

Using UIMA-AS required a lot of code in the end, so it’s not as easy to encapsulate in a (concise) post. The complete repo with tests is available at: https://github.com/corajr/dkpro-scala-example-2

To run the tests on this repo: ./activator test

Ultimately, while the comparative standardization and wide range of analysis engines available for UIMA are attractive, it feels a bit cumbersome to try to use in Scala. Other libraries, like Spark CoreNLP, might be a better fit for most applications.