Streaming Ona Data with NiFi, Kafka, Druid, and Superset

Thursday, August 31, 2017

A common need across all our projects and in our partners projects is to build up-to-date indicators from stored data. In our malaria spraying project (mSpray), our work monitoring the drought response in Somalia, and our electronic medical record system work for OpenSRP, we need to build dashboards showing the progress of the project and other information relevant to stakeholders. Currently we do this on an ad-hoc basis, but we are in the process of building a unified pipeline to move data from heterogenous systems into a data warehouse and build indicators on top of this data.

This need breaks down into the following minimal requirements:

  1. Store data
  2. Store queries relative to the data
  3. Retrieve the results of queries executed against the latest data

The queries may be complex, e.g. containing grouping, filtering, and aggregation clauses, and the datasets may be large, e.g. hundreds of millions of rows. In order to use this information in on-demand dashboards, we must be able to retrieve the results of these complex queries in ~100ms. We would also like to build in the flexibility to expand the system over time to handle complex event processing and machine learning tasks. To handle these requirements we are building a system composed of the follow tools:

  1. Data ingestion using Apache NiFi
  2. Data warehousing in the Hadoop File System (HDFS)
  3. Stream processing using Apache Kafka
  4. OLAP database storage using Druid
  5. Visualization using Apache Superset

When all integrated, the data flow looks like this:


Below we will walk through what we’ve done so far to build this system and provide instructions that you can follow along to get it set-up yourself for testing. The below is useful to understand the different tools and the processes they rely on.

Note that the below is not designed for a production environment, although the overall structure is the same. In production environments we use a number of automated infrastructure, monitoring, and redundancy tools that are not discussed here.

The below would be much easier to get up and running using Docker or another container system. To get a deeper understanding of how everything works under the hood, we did not use that approach. We would love for someone to contribute a Docker file combining all the below tools, let us know if you do and we will post an update.

Note that the Hortonworks Dataflow (HDF) platform provides a Docker file containing a superset of the above tools, and this GoDataDriven article describes a similar setup and includes a Docker file using Divolte (a clickstream collector) instead of NiFi.

Data Ingestion and Coordination using NiFi

To coordinate the tasks of retrieving data from multiple APIs, minimally transforming it, and appropriately routing it, we use Apache NiFi. We are going to use NiFi to pull historical data from the Ona API and push that data into Kafka and HDFS for later processing. (Future Work has a description of how we do this in production.)

On macOS we can install NiFi with brew install nifi, and then run it with:

/usr/local/bin/nifi run

After that NiFi will be available on http://localhost:8080/. Once you have loaded NiFi you can create processor to fetch, store, or process data as well as describe the connections between these processor. To get started quickly here’s a Ona API NiFi flow in XML. This lets you pull data from the Ona API every 60 seconds and route it to Kafka. The imported flow will look like below:


If you read through the flow, you’ll see that we do a number of preprocessing steps before writing the data. When InvokeHTTP runs it will output API data which is an array of JSON maps. We then convert this batch output into a single output per JSON map using the SplitJSON component. This will, for example, take a single record that is an array of 10 JSON maps and convert it into 10 records, each of which is a single JSON map.

var flowFile = session.get();

if (flowFile != null) {
  var StreamCallback =  Java.type("");
  var IOUtils = Java.type("");
  var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");

  flowFile = session.write(flowFile,
    new StreamCallback(function(inputStream, outputStream) {
        var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
        var obj = JSON.parse(text);
        for (var key in obj) {
          value = obj[key];
          if (typeof value === 'object') {
            obj[key] = JSON.stringify(value);
          obj[key.replace(/\//g, '-')] = obj[key];
          delete obj[key];
        outputStream.write(JSON.stringify(obj, null, '\t').getBytes(StandardCharsets.UTF_8));
  flowFile = session.putAttribute(flowFile, 'filename', flowFile.getAttribute('filename').split('.')[0] + '_translated.json');
  session.transfer(flowFile, REL_SUCCESS);

Because the Ona data is based on the XForm spec, keys in this JSON map may have forward slashes (/) in them, and values may have nested JSON. If Druid encounters either of these in a JSON map it will not be able to process it. Therefore, we use an ExecuteScript processor to convert /s into -s and nested JSON maps into strings. The ECMAScript Script Body used by the ExecuteScript processor is above, this code was inspired by this Fun with Apache NiFi [article]]( (The future Work section below has a description of how we better handle nested JSON in production.)

Because Ona’s data is only accessible securely over HTTPS, we must configure NiFi to work over SSL. To do this we will use the Java keytool to create a KeyStore and a TrustStore. Then we will import an RSA certificate into our local TrustStore.

To do this, first download a root certificate from the SSL certificate used by Ona (or whatever other HTTPS service your API is hosted behind), here’s Ona’s USERTrustRSACertificationAuthority.crt. Next update the first two lines of the below script with your own passwords and run the script in the same location you’ve downloaded the root certificate:


echo "Generate server certificate and export it"
${JAVA_HOME}/bin/keytool -genkey -alias server-alias -keyalg RSA -keypass $KEYPASS -storepass $STOREPASS -keystore keystore.jks
${JAVA_HOME}/bin/keytool -export -alias server-alias -storepass $STOREPASS -file server.cer -keystore keystore.jks

echo "Create trust store"
${JAVA_HOME}/bin/keytool -import -v -trustcacerts -alias server-alias -file server.cer -keystore cacerts.jks -keypass $KEYPASS -storepass $STOREPASS

echo "Import CA certificate"
${JAVA_HOME}/bin/keytool -importcert -file USERTrustRSACertificationAuthority.crt -alias RSA -keystore cacerts.jks -storepass $STOREPASS

This will create the KeyStore keystore.jks and the TrustStore cacerts.jks with the RSA certificate loaded into it. It will also create a server.cer, which we can ignore.

To use these new certificates, select the InvokeHTTP processor, then in its Configuration under PROPERTIES, click on the arrow as below:


In the new modal window select the StandardSSLContextService, then hit the info circle to edit its properties. Now set the Keystore Type and Truststore Type to JKS, then the Keystore Filename to the location of your KeyStore and the Truststore Filename to the location of your TrustStore. Finally, set the passwords to the values you provided in the above script. Hit OK, then use the lightning bolt on the righthand side of the row to enable your new ContextService. If all goes well you should now be able to connect to HTTPS URIs.

To get some data, go back to the flow window, edit the InvokeHTTP component to set your Ona username, password, and form ID in the URL field. Edit the PutFile component to specify a directory to write flat files to (this is useful for debugging the data returned by the API and processed by NiFi). Now start the InvokeHTTP, LogAttribute, SplitJson, ExecuteScript, and PutFile processes. You should see queues filling and files accumulating in the directory that you have specified. If all looks good, stop the processes and let’s move on to the next step.

Real-time Stream Processing using Kafka

Apache Kafka is a real-time stream processor that uses the publish-subscribe message pattern. We will use Kafka to receive incoming messages and publish them to a specific topic-based queue that Druid will subscribe to. The Druid indexer will read off these messages and insert them into its database. As of this writing, the latest Kafka version that NiFi components support is 0.10, therefore we will install the 0.10 version of Kafka. Use the below commands to download version 0.10, extract it, and change to its directory:

curl -O
tar xzvf kafka_2.10-
cd kafka_2.10-

For our simple local set-up we will use the default configurations. To manage the Kafka process, and later the Druid processes, we will first start the ZooKeeper task manager:

./bin/ config/

Leave this running and open a new console in which to start the Kafka process:

./bin/ config/

To view the clusters and topics we will install the Kafka Manager. This allows us to easily proof check the insertions we are performing, and more quickly surface any errors we might encounter. To install Kafka Manager, first clone the github repository and change to its directory:

git clone
cd kafka-manager

Next we’ll build the Kafka Manager and run it in the foreground. Since this is a Scala project be sure you have the Scala Build Tool (sbt) installed, e.g. brew install sbt.

sbt clean dist

With the Kafka Manager running you can visit http://localhost:9000/ to view the current Kafka Clusters, Topics, and Consumers.

High performance queries with Druid

Druid is a column-oriented data store, comparable to Vertica, Apache Impala, or Dremel. To power the complex real-time queries that will build the data used in our dashboards we will index data in Druid. Once the data is indexed, we can use Superset for exploratory analysis and to define the JSON queries that we will execute against the Druid API and use to build our dashboards.

To start up Druid, first get the latest Druid source code, decompress it, and change into its directory:

curl -O
tar xzvf druid-0.10.0-bin.tar.gz
cd druid-0.10.0

To start the Druid services required for our use case, we will run five processes. There will be one process each for:

  • a historical data server,
  • a server broker,
  • a server coordinator,
  • a server overlord (the master manager),
  • and a server middle manager.

To do this, run each of the below commands in a separate console window:

java `cat conf-quickstart/druid/historical/jvm.config | xargs` -cp "conf-quickstart/druid/_common:conf-quickstart/druid/historical:lib/*" io.druid.cli.Main server historical
java `cat conf-quickstart/druid/broker/jvm.config | xargs` -cp "conf-quickstart/druid/_common:conf-quickstart/druid/broker:lib/*" io.druid.cli.Main server broker
java `cat conf-quickstart/druid/coordinator/jvm.config | xargs` -cp "conf-quickstart/druid/_common:conf-quickstart/druid/coordinator:lib/*" io.druid.cli.Main servercoordinator
java `cat conf-quickstart/druid/overlord/jvm.config | xargs` -cp "conf-quickstart/druid/_common:conf-quickstart/druid/overlord:lib/*" io.druid.cli.Main server overlord
java `cat conf-quickstart/druid/middleManager/jvm.config | xargs` -cp "conf-quickstart/druid/_common:conf-quickstart/druid/middleManager:lib/*" io.druid.cli.Main server middleManager

After starting the above processes, check the Druid Console at http://localhost:8081 to make sure everything looks good.


To manage the continuous process that indexes Kafka data, we’ll download, change directories, and run Druid’s Tranquility extension. Assuming you’re in the same directory with the folder that Druid 0.10 was installed, use the following to get the lastest version, decompress it, and then run it:

curl -O
tar xzvf tranquility-distribution-0.8.0.tgz
cd tranquility-distribution-0.8.0
bin/tranquility server -configFile ../druid-0.10.0/conf-quickstart/tranquility/server.json

To read from a Kafka stream we will define a configuration file to describe a data source name, a Kafka topic to read from, and some properties of the data that we read. Save the below JSON configuration as ona-kafka.json and place in the same directory as Druid:

  "dataSources" : {
    "ona-kafka" : {
      "spec" : {
        "dataSchema" : {
          "dataSource" : "ona-kafka",
          "parser" : {
            "type" : "string",
            "parseSpec" : {
              "timestampSpec" : {
                "column" : "_submission_time",
                "format" : "auto"
              "dimensionsSpec" : {
                "dimensions" : [],
                "dimensionExclusions" : [
              "format" : "json"
          "granularitySpec" : {
            "type" : "uniform",
            "segmentGranularity" : "six_hour",
            "queryGranularity" : "none"
          "metricsSpec" : []
        "ioConfig" : {
          "type" : "realtime"
        "tuningConfig" : {
          "type" : "realtime",
          "maxRowsInMemory" : "100000",
          "intermediatePersistPeriod" : "PT10M",
          "windowPeriod" : "PT720000M"
      "properties" : {
        "task.partitions" : "1",
        "task.replicants" : "1",
        "topicPattern" : "ona-api-test"
  "properties" : {
    "zookeeper.connect" : "localhost",
    "druid.discovery.curator.path" : "/druid/discovery",
    "druid.selectors.indexing.serviceName" : "druid/overlord",
    "commit.periodMillis" : "15000",
    "consumer.numThreads" : "2",
    "kafka.zookeeper.connect" : "localhost",
    "" : "tranquility-kafka"

This instructs Tranquility to read from the topic ona-api-test and push the messages that it receives into a Druid data source called ona-kafka. In the messages it reads Tranquility uses the _submission_time column (or key) to represent the time stamp. What is particularly unusual about this file is that we set a very large windowPeriod of PT720000M. We do this because Druid will ignore any messages with times outside of the windowPeriod. Because we are using historical data for this test, our time stamps are outside of the typical hour or so window period.

We can start an indexer task using our ona-kafka.json spec with:

bin/tranquility kafka -configFile ../druid-0.10.0/ona-kafka.json

Alternatively, it is possible to put data directly into Druid and skip the NiFi and Kafka steps. If you would like to do this you can use the below JSON configuration:

  "type" : "index_hadoop",
  "spec" : {
    "ioConfig" : {
      "type" : "hadoop",
      "inputSpec" : {
        "type" : "static",
        "paths" : "/path/ona-api-demo.json"
    "dataSchema" : {
      "dataSource" : "ona-json",
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "day",
        "queryGranularity" : "none",
        "intervals" : ["2016-07-29/2016-07-30"]
      "parser" : {
        "type" : "hadoopyString",
        "parseSpec" : {
          "format" : "json",
          "dimensionsSpec" : {
            "dimensions" : [
          "timestampSpec" : {
            "format" : "auto",
            "column" : "_submission_time"
      "metricsSpec" : []
    "tuningConfig" : {
      "type" : "hadoop",
      "partitionsSpec" : {
        "type" : "hashed",
        "targetPartitionSize" : 5000000
      "jobProperties" : {}

This assumes you have valid JSON data from the Ona API in the file /path/ona-api-demo.json that has been transformed to remove /s in keys and to remove nested JSON in values. It also assumes that the _submission_time of the data is between the value in intervals. Go ahead and adjust those based on your demo data. Assuming you have saved an adjusted configuration file like the above to ona-api-index.json, you can load a demo JSON file into Druid using:

curl -X 'POST' -H 'Content-Type:application/json' -d @ona-api-index.json localhost:8090/druid/indexer/v1/task

Visualization with Superset

Superset lets us explore the data in our Druid index and build interactive JSON queries which we can later use to power dashboards. To get this up and running we will first need to install Superset. You can follow the instructions in the Superset Getting Started or, assuming you have installed Python’s virtualenvwrapper, use the below commands for macOS, which are mainly taken from there:

# Install required libraries
brew install pkg-config libffi openssl python

# Install the python cryptography library
env LDFLAGS="-L$(brew --prefix openssl)/lib" CFLAGS="-I$(brew --prefix openssl)/include" pip install cryptography==1.7.2

# Create a virtual environment
mkvirtualenv superset

# Upgrade your new virtual environment
pip install --upgrade setuptools pip

# Install superset
pip install superset

# Create an admin user (you will be prompted to set username, first and last name before setting a password)
fabmanager create-admin --app superset

# Initialize the database
superset db upgrade

# Load some data to play with
superset load_examples

# Create default roles and permissions
superset init

Then, start Superset with:

# Start the web server on port 8088, use -p to bind to another port
superset runserver

Finally, we need to add a Druid cluster in SuperSet so that we can access the data stored in the Druid index. To do this, visit the dashboard at http://localhost:8088, then navigate to the Sources item in the top menu bar then choose the Druid Clusters option.


This list should be empty. Hit the teal plus button on the left hand side and enter the below information for your new Druid Cluster.


Now return to the Sources menu at the top and select Refresh Druid Metadata. After this, again in the Sources menu, navigate to Druid Datasources and you will see your ona-json Datasource. This is the datasource either streaming in, if you used the NiFi and Kafka setup, or from the static JSON file, if you posted data to the indexer.

You can now use SuperSet to explore this dataset. Try creating some queries against it and visualizing the outputs.


Future Work

Differentiate Batch and Stream Ingestion

Using NiFi to push pulled-data from the API into Kafka was only for the purposes of this proof of concept. In production we will handle streaming data by setting up a listener that receives Ona webhooks containing real-time create, update, and delete events then pushes these into Kafka and HDFS.

We will still use one-time pulls from the API to synchronize historical data from Ona into HDFS and then into Druid. We will begin by using NiFi for this task, but plan to explore alternative tools as well.

Handling Nested JSON

We mentioned above that Druid is not able to index nested JSON, which our records contain, and that we avoid this problem by serializing the JSON values. This is obviously a problem because we cannot run queries against individual key values in the serialized JSON.

One potential solution is to build a semi-relational structure by adding a UUID to the incoming JSON and then inserting that as a foreign parent key in each nested JSON record with a label key whose value is the key of the nested JSON in the original record. For example, assuming we reserve the __.+__ namespace pattern for our internal keys:

  "key1": "value1",
  "key2": {
    "inner-key1": "inner-value1",
    "inner-key2": {
      "inner-inner-key1": "inner-inner-value1"

would become three records. The first of which is the outer record and has an assigned __uuid__:

  "__uuid__": "AAAA",
  "key1": "value1"

The second of which is the first nested JSON map referencing its parent through __parent__ and storing its original key in __label__:

  "__uuid__": "BBBB",
  "__parent__": "AAAA",
  "__label__": "key2",
  "inner-key1": "inner-value1"

The third of which is the doubly nested JSON map only referencing its immediate parent:

  "__uuid__": "CCCC",
  "__parent__": "BBBB",
  "__label__": "inner-key2",
  "inner-inner-key1": "inner-inner-value1"

This structure allows children to easily query parents but not the other way around. To let parents query their children, we could store the child UUID as the value for the what used to be a nested JSON map:

  "__uuid__": "AAAA",
  "key1": "value1",
  "key2": "BBBB"

The major weakness of this approach is that it adds a significant amount of structural complexity to the datasets. Relatedly, if we expected all our records to have the same key-value pairs, they no longer would. An alternative approach is to flatten our data by pushing all nested keys to the top layer and renaming these keys using the parent keys as prefixes. Again reserving the __.+__ namespace pattern for generated keys and also using it as a separator, this would produce a final JSON map of:

  "key1": "value1",
  "__key2__inner-key1__": "inner-value1",
  "__key2__inner-key2__inner-inner-key1__": "inner-inner-value1"

when applied to same example dataset.

Calculating Surprise Values in ClojureScript

Saturday, April 01, 2017

Earlier this year I rewrote Ben Birnbaum’s outlier-detect Python code in ClojureScript and combined it with some very simple React glue code to build an on-demand s-Value calculator. s-Values represent the amount of surprise in a grouped value relative to other data in a larger dataset. A higher s-Value means the data is less expected.

The output is based on what Birnbaum presented as a screenshot of a prototype in his dissertation, Algorithmic approaches to detecting interviewer fabrication in surveys. In that version the color of the text changed depending on the s-Value. In my version the text color is constant but the background color of the cells are different shades of red, with darker shades indicating an s-Value more standard deviations from the mean for that row.


You can use the web interface to generate s-Values by URL from a JSON file represented as a list of maps, or use your credentials to load data in your account. This is very much a work in progress, please get in touch or open an issue with any feedback or ideas you have.

Tech at Ona: What We Built in 2016

Friday, December 30, 2016

This is a reposting or an article for the Ona blog. It’s been a big year for the Ona tech team! In this post, we look at what we built in 2016.

Ona platform tech in 2016

In 2016 we added more new features to the Ona platform than in the previous two years combined. Here’s a run-down of select features we added to Ona in 2016:

  • CSV uploads – Upload any CSV into Ona and we’ll automatically build an XLSForm from the CSV’s columns with data types guessed based on the data. E.g. if a column has only dates we’ll assume it should be formatted as a date type, but give you option to adjust that.
  • Photo gallery – View only the images from your dataset in a grid-based gallery or full-screen slideshow.
  • Dynamic form linking – Use the data in one form to populate questions in another. For example, you could use a school registration form to collect the list of all the schools in your district, and then in a school performance form you could have a drop-down menu where users choose one of the schools from the registration form and then add additional performance data about that school.
  • RapidPro integration – Forward incoming data from Ona to RapidPro and trigger flows based on that data. E.g. send out a text message to a number submitted in an Ona form with a message based on that submission’s data.
  • Google Sheets integration – Connect your dataset to Google Sheets and as you submit new data, or edit existing data, Ona will update your spreadsheet. You can use this to create lightweight dashboards with realtime data collected using Ona.
  • HXL support – Tag your dataset columns with HXL codes for easy integration into the Humanitarian Data Exchange and other existing datasets or repositories.
  • Save charts to a dashboard and chart group by – Create charts with one column grouped by another column and save any charts you create to a dashboard.

In addition to the new features above, we improved performance to handle the 4.5 million new submission we received. This was a jump from an average about 6,500 submission a day in 2015 to 12,500 per day in 2016. Next year we’ll be putting even more focus on performance and fix anything that might be slowing you down.

OpenSRP tech in 2016

We’ve made significant improvements in the OpenSRP platform. As the technical lead on OpenSRP our biggest task this year was transitioning the server and client to use an Event/Client data model. This helped us support more efficient client-server data synchronization.

We’ll continue to be busy with OpenSRP next year. We’re about to roll out a number of new implementations, including a generic vaccination register. And we’re also very excited that the UNICEF Innovation Fund invested in OpenSRP as one of their inaugural five technology investments.

Free Open Source Software at Ona in 2016

We’re still improving the documentation and doing clean up, but in late-2016 we published an updated version of our core data collection application, onadata (Github). This fixes some serious issues encountered when running at scale, introduces a more robust permissions model based around projects, and stores all data in PostgreSQL + PostGIS database. All new development will take place in this repository on the master branch with stable releases (Github) tagged.

We’ve continuously updated milia (Github), our Clojure/Script Ona Client API library. We’ve added libraries to interact with more API endpoints and improved overall stability. Also in the Clojure world, we’ve been incrementally adding functionality to our Clojure/Script utilities library chimera (Github) and our data viewer library hatti (Github).

This year we released an Ona to R integration. This lets you load realtime datasets directly into your R scripts. Forest Carbon used Ona.R to write an R Shiny web application that automated analysis and feedback. The source code for ona.R (Github) is freely available, we’re looking forward to your patches and extensions!.

Finally, we’ve released a public version of the STEPS app developed for the World Health Organization’s STEPwise approach to noncommunicable disease risk factor surveillance. If you’re interested check out the the code for the Android steps-app (Github).

We’re excitedly looking forward to an even bigger year in 2017. Happy new year from the Ona technology team!

Ghetto by Mitchell Duneier

Tuesday, April 12, 2016

About 5 years ago I was a researcher at Princeton and worked with Mitchell Duneier. We used historical text analysis to evaluate his thesis, it’s great to see his book in print. To do part of the analysis I used Google’s text n-gram data and a mix of Python, awk, bash, and R scripts.


Here’s a New York Time review with more details on his work.

Map Your World and Ona at Geo for Good 2015

Wednesday, November 25, 2015

This past October I spoke on behalf of Map Your World at the 2015 Geo for Good User Summit.

Map Your World empowers youth to explore issues and ideas that matter - like clean drinking water, or food justice – then write surveys, collect data, and create maps to make change in their communities.

Map Your World is powered by the Ona API.

Below is a video of the talk I gave, which includes a clip from the film The Revolutionary Optimists.