This post takes the minimal Scala wrapper for UIMA from the previous post and extends it to make use of UIMA Asynchronous Scaleout (UIMA-AS), an extension to UIMA that allows for multicore/distributed analyses. UIMA-AS requires a lot of XML configuration files, which we can generate programmatically for convenience.
- UIMA App Context
- How to Build an Analysis Pipeline
- Managing the UIMA-AS Lifecycle
- Running an Analysis
- Complete demo
UIMA-AS has some similar affordances to uimaFIT with regard to programmatic
configuration, but we can no longer make use of many of the convenience methods
(including iteratePipeline
) offered by uimaFIT. No matter, we’ll write our
own!
UIMA App Context
The most basic runtime configuration is stored in a UIMA Application Context,
which in the UIMA-AS Java examples is represented as a java.util.Map<String, Object>
. For our wrapper, we can create a case class containing all the
necessary information with richer types than the original, and a method toMap
that will translate it into the Java form.
import org.apache.uima.resourceSpecifier.factory.SerializationStrategy
case class UimaAppContext(
: URL = getClass.getClassLoader.getResource("dd2spring.xsl"),
dd2SpringXsltFilePath: URL = getClass.getClassLoader.getResource("saxon8.jar"),
saxonClasspath: URI = new URI("tcp://localhost:61616"),
serverUri: String = "uimaAS",
endpoint: Int = 2,
casPoolSize: Int = 500000, // number of 4-byte words = 2000000 bytes (2 MB)
casInitialHeapSize: String = "",
applicationName: Int = 60000,
getMetaTimeout: Option[Int] = None,
timeout: Option[Int] = None,
cpcTimeout: SerializationStrategy = SerializationStrategy.binary
serializationStrategy) {
def toMap: java.util.Map[String, Object] = ???
}
object UimaAppContext {
implicit def asMap(appCtx: UimaAppContext): java.util.Map[String, Object] = appCtx.toMap
}
The Option[Int]
fields will be omitted from the config map if they are None
,
signifying an unlimited timeout. All timeouts are specified in milliseconds.
Most of these fields can be left as their defaults. Perhaps the most important
is casPoolSize
, which determines how many documents will be processed
concurrently.
Deployment Configuration
Using the UimaAppContext
as an input, we can now create a case class for the
complete deployment configuration. Most of its settings come from the
UimaAppContext, while some others must be specified in addition.
case class UimaAsyncDeploymentConfig(
: Seq[AnalysisEngineDescription],
engineDescs: UimaAppContext = UimaAppContext(),
appCtx: String = "uima-service",
name: String = "",
description: String = "",
descriptor: Boolean = true) {
asyncdef toXML(): String = ???
}
Because UIMA-AS requires the configuration files to exist on disk, the toXML
method should return a file path to where it has written its configuration (to a
temp file, in most cases).
How to Build an Analysis Pipeline
The real benefits of UIMA-AS become apparent when some operations can run in
parallel, but specifying the flow can be somewhat complicated. For now, we’ll
reuse the Process
abstraction from the prior post, which simply stores a
sequence of AnalysisEngineDescription
s to be run one after another.
One unfortunate consequence of switching to asynchronous processing in UIMA is
that the convenient Iterator[JCas]
interface offered by uimaFIT is no longer
usable. By the nature of UIMA-AS, a callback will receive completed CASes in an
arbitrary order. Further, just as in synchronous UIMA processing, the results of
an analysis are only available while looping over the iterator, since the CAS
object is reused to store each document in turn. Therefore, we need to change
our interface so that we specify how to collect our desired results at call
time, rather than getting an Iterator
to be used later.
To resolve this issue, we will pass around the means to collect and process the
results of each analysis as a function from JCas => T
. In practice, we will
often want to know which document the result came from, so we ultimately return
a Map[String, T]
from our analysis functions (where the String is a unique
identifier, drawn from the document title in the JCas
metadata).
Managing the UIMA-AS Lifecycle
Since UIMA-AS is designed to delegate its analyses to remote machines, a bit of
ceremony is needed to start up and shut down its constituent services even when
we’re only running multiple engines on one machine. We’ll define a UimaAsync
class to manage the process, and store an app-wide broker connection in the
companion object.
import org.apache.uima.aae.client.UimaAsynchronousEngine
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl
class UimaAsync(val config: UimaAppContext = UimaAppContext()) {
import uimaAS.UimaAppContext._
val engine: UimaAsynchronousEngine = new BaseUIMAAsynchronousEngine_impl
var springContainerId: Option[String] = None
def start[T](corpus: Corpus, process: Process, block: Util.Block[T], collectionTotal: Option[Int] = None): Future[Util.Results[T]] = ???
def stop(): Unit = ???
}
object UimaAsync {
val uri = UimaAppContext().serverUri
var broker: Option[BrokerService] = None
def start(): Unit = {
if (broker.isEmpty) {
val brokerService = new BrokerService
.setBrokerName("localhost")
brokerService.setUseJmx(false)
brokerService.addConnector(uri.toString)
brokerService.start()
brokerService= Some(brokerService)
broker }
}
def stop(): Unit = broker.foreach { _.stop() }
}
Running an Analysis
The other important piece is a callback listener (to collect the results that are to be returned, and to confirm that all documents have been processed).
With all these elements in place, we can finally run an analysis!
First, set up the corpus and the analysis pipeline:
val corpus = Corpus.fromDir("dir_with_txt_files")
val lemmatize =
Process(
createEngineDescription(classOf[ClearNlpSegmenter]),
createEngineDescription(classOf[ClearNlpPosTagger]),
createEngineDescription(classOf[ClearNlpLemmatizer]))
And then run the pipeline asynchronously:
.start() // start the ActiveMQ broker
UimaAsyncval uimaAS = new UimaAsync()
val getLemmas = { jcas =>
.select(classOf[Lemma]).take(5).map(_.getValue).toVector
jcas}
val results: Future[Map[String, Vector[String]]] = uimaAS.start(corpus, Process.lemmatize, Process.wrapBlock(getLemmas))
{ allLemmas =>
results onSuccess allLemmas("1789-Washington.txt") shouldBe Vector("fellow", "-", "citizen", "of", "the")
}
Note that a blocking runMultiThread
method has been added on the Process
class for convenience; with this, the usage becomes:
val allLemmas = lemmatize.runMultiThread(corpus) { jcas =>
.select(classOf[Lemma]).take(5).map(_.getValue).toVector
jcas}
allLemmas("1789-Washington.txt") shouldBe Vector("fellow", "-", "citizen", "of", "the")
Complete demo
Using UIMA-AS required a lot of code in the end, so it’s not as easy to encapsulate in a (concise) post. The complete repo with tests is available at: https://github.com/corajr/dkpro-scala-example-2
To run the tests on this repo: ./activator test
Ultimately, while the comparative standardization and wide range of analysis engines available for UIMA are attractive, it feels a bit cumbersome to try to use in Scala. Other libraries, like Spark CoreNLP, might be a better fit for most applications.