A simple way to build your Real-Time dashboard

Diogo Santos
6 min readJun 10, 2020

Building a live dashboard could be a headache due to the complex architecture and hard maintenance. Nowadays, the data is power and as soon as they are extracted, transform, and loaded (ETL) our systems can give us responses in realtime.

Architect this kind of system could be a system that consumes data from an external source (Apache Kafka, Kinesis, RabbitMQ …) and writes the data to persistence storage like Apache Cassandra, MySQL, etc. This implies another service that will fetch the data from the persistence storage and the complexity starts growing:

  • maintenance
  • IOPs
  • throughput
  • data consistency
  • availability

Of course, other solutions will face the same problems but a limited scope is more simple maintaining and evolving than a complex system.

Queryable State

Apache Flink has the feature that exposes any keyed state to the outside world reducing the need of having persistence storage in the middle to handle this sharing data.

If you want to know more about Apache Flink you could read the previous post

Despite this feature seems to be powerful not only to build a live dashboard but for other purposes like tracing messages, workflows state, etc, be in mind that this only works if the Apache Flink job is running. Scenarios like restarts due to errors or maintenance, migrations, up/down scaling implies an outage on the data accessibility.

Architecture

Looking into the diagram is easy to see the main components that compose the Queryable State feature:

  1. QueryableStateClient is the process that submits the user queries and returns the result to the external application.
  2. QueryableStateClientProxy, this process runs on each TaskManager and is responsible to handle the requests of the clients and the response. At first sight, this seems simple but as we see in the below diagram the data is spread in the clusters, i.e, the data requested by the client could be in another TaskManager and is necessary to know where it is and the JobManager is the component that as that information.
  3. QueryableStateServer runs on each TaskManager and is responsible to forward the associated state of the key requested to the client proxy.
Request flow

Demo of Real-time state IoT

The source code could be found here and you can easily setup the project after reading the README.

The high-level architecture of the project RIOT (real-time state IoT) is to provide insights as soon as possible of what’s going on, in the system composed by the devices that are reporting the actual state. RIOT will consume a stream containing the state of the devices and another stream that contains the triggers where each state consumed will be evaluated to trigger or not an alert.

The triggers are added/changed at runtime without the need for stopping the Flink job.

The components of the architecture and their interactions are described above, in this demo, only the top components will be described (Backend, and RIOT) and the others will be out of the scope, they are showed to be understandable how the data came through the data pipeline.

Backend and UI interact using WebSockets and the REST API which allows the user to create triggers for the changes state of the several devices that could lead the creation of alerts by RIOT and those alerts are available on the message broker (Apache Kafka) to be consumed by some client. WebSocket communication is used to update the dashboard with the last state of each device and this is done using the interactive queries that Apache Flink provides to interact with the internal state. The Backend has an endpoint that triggers a call to the Flink Cluster to know the state of a key.

Implementation

To enable the queryable state on Flink Cluster is necessary two steps:

  1. copy the flink-queryable-state-runtime-* JAR file from /opt folder to the /lib folder
  2. and in the configuration file flink-conf.yaml, set the property queryable-state.enable to true

To verify if the queryable state is properly configured, check the logs for the following line in the TaskManager logs:

Started the Queryable State Proxy Server @ …

The line above means the queryable state threads are up and running and waiting for the requests.

Further information about the configuration could be found in Flink documentation. To be aware the default ports you are using:

  • QueryableStateServer: 9067
  • QueryableStateClientProxy: 9069 (this will be the port used by your external service to query state from Flink Cluster)

By now, you already know how the Queryable State feature is enabled on the Cluster, so it’s time to revise the implementation details.

Externalizing state

Flink externalizes data of any keyed state and those queries to the state are read-only to avoid the external services changing the internal state.

Attention: When querying a state object, that object is accessed from a concurrent thread without any synchronization or copying. This is a design choice, as any of the above would lead to increased job latency, which we wanted to avoid. Since any state backend using Java heap space, e.g. MemoryStateBackend or FsStateBackend, does not work with copies when retrieving values but instead directly references the stored values, read-modify-write patterns are unsafe and may cause the queryable state server to fail due to concurrent modifications. The RocksDBStateBackend is safe from these issues.

Flink documentation

Calling the method .setQueryable(String queryableStateName) on the state descriptor makes the keyed state queryable. The following code describes how to expose the state devices-state.

override def open(parameters: Configuration): Unit = {
val mapStateDescriptor = new MapStateDescriptor[String, String](
"windowDevicesState",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
mapStateDescriptor.setQueryable("devices-state") windowDevicesState = getRuntimeContext.getMapState[String, String](mapStateDescriptor)
}

The queryable state name devices-state is the name under which the state is queryable by the client.

Flink allows another way to expose the state, calling the method asQueryableState on a KeyedStream, you can find more information on the documentation.

By now, you already know how to expose the state to an external application, now we will take a look at how is possible to query the state.

Our setup is using Kotlin but you can use any JVM application to query the state of a running Flink job.

You need two dependencies on your classpath:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-client-java</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.10.0</version>
</dependency>

The QueryableStateClient is initialized providing a TaskManager hostname and the port in which the queryable state proxy is listening (default port is 9069) (code):

@Component
class FlinkStateService: StateService {
@Autowired
private lateinit var producer: KafkaProducer<State>
private lateinit var client: QueryableStateClient override fun init(host: String, port: String): Mono<Boolean?> {
client = QueryableStateClient(host, port.toInt())
return true.toMono()
}
...
}

After properly initiated the client you can query the state (code):

@Component
class FlinkStateService: StateService {
... override fun getState(id: String, jobId: String): Mono<List<State>?> {
val states: MapState<String, String> = queryState(id, JobID.fromHexString(jobId), client).get()
return states.iterator().asSequence().map {
entry -> Gson().fromJson(entry.value, State::class.java)
}.toList().toMono()
}
override fun register(alert: Mono<State>): Mono<*> {
return alert.subscribe {
producer.send("devices.state", it.getId().toString(), it)
true
}.toMono()
}
fun queryState(key: String, jobId:JobID, client: QueryableStateClient): CompletableFuture<MapState<String, String>> {
val descriptor: MapStateDescriptor<String, String> = MapStateDescriptor(
"windowDevicesState",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
)
val executionConfig = ExecutionConfig()
executionConfig.enableForceKryo()
client.setExecutionConfig(executionConfig)
return client.getKvState(jobId, "devices-state", key, Types.STRING, descriptor)
}
}

The method getKvState returns CompletableFuture<S> which will hold the state of the query for the identified queryStateName and jobID. JobID value can be found on the Flink Web UI.

Conclusion

Despite queryable state has some limitations, the client API is an evolving state which could introduce breaking changes, you should avoid the state backends MemoryStateBackend and/or FsStateBackend, the queryable state is bound to the life-cycle of the job, besides this “limitations” you should be fine using this fancy feature to build your real-time dashboard.

This project was developed as a side project to highlights what’s going on with my IoT devices, and by now is integrating a lot more stuff due to the simplicity of the Flink.

Source code:

Real-Time state IoT (RIOT)

Real-Time state IoT Backend

--

--