Kafka Demo with InterSystems IRIS


Follow
0
Star
1
Details
Releases
Issues

What's new in this version

Initial Release

InterSystems IRIS Data Ingestion and Schema Evolution Example

See a video of this demo!

Demo of InterSystems IRIS with Kafka, Schema Registry, AVRO and Schema Migration

This demo allows you to show:

  • A Bank Simulator generating AVRO messages and sending them over Kafka to InterSystems IRIS
  • InterSystems IRIS Multi-model capabilities (AVRO, JSON, Objects, SQL and MDX)
  • How InterSystems IRIS can transform the AVRO object into a canonical structure using transformations and lookups
  • How InterSystems IRIS can orchestrate this work and start a human workflow in case of a problem during the transformations
  • How InterSystems IRIS can provide bidirectional data lineage (from source to canonical and vice-versa)
  • How InterSytems IRIS pull new schemas from a Kafka Schema Registry and generate the data structures automatically to support schema evolution

This demo uses Confluent's Kafka and their docker-compose sample.

How to run the demo

WARNING: If you are running on a Mac or Windows, you must give Docker at least 5Gb of RAM for this demo to run properly. Also, please check this troubleshooting document in case you find problems starting the demo with docker-compose. Disk space available for the docker VM is the most common cause for trouble.

To run the demo on your PC, make sure you have Git and Docker installed on your machine.

Clone this repository to your local machine to get the entire source code. Don't worry, you don't need to rebuild the demo from its source. It is just easier this way. After cloning the repo, change to its directory and start the demo:

git clone https://github.com/intersystems-community/irisdemo-demo-kafka
cd irisdemo-demo-kafka
docker-compose up

When starting, you will see lots of messages from all the containers that are starting. That is fine. Don't worry!

When it is done, it will just hang there, without returning control to you. That is fine too. Just leave this window open. If you CTRL+C on this window, docker compose will stop all the containers and stop the demo.

After all the containers have started, open a browser at http://localhost:10001/csp/appint/demo.csp to see the landing page of the demo. When requested, use the credentials SuperUser/sys to log in.

You are going to see a page like this:

Architecture of Demo

This is the landing page of the demo. Everything on this image is clickable. The data simulator is at the left of the image. Just click on its square to open it. It is capable of generating the AVRO messages and send them to Kafka. Here is an example of the UI of the simulator:

Simulator Running

After clicking on Run Test on the simulator, go back to the demo landing page and click on the green arrow at the right that reads Normalized Data. You will see a list of messages that are coming into InterSystems IRIS and being normalized. Click on the green link of one of them to see the data lineage (message trace):

Data Lineage

Types of Messages Generated by the Simulator

On this demo, we use a bank data simulator to generate AVRO messages and send them over Kafka to InterSystems IRIS. The simulator is driven by a simple UI. Once you start the docker-compose accordingly to instructions above, you can open the Simulator UI directly by clicking here: http://localhost:10000 or using the demo landing page as described above.

Here are examples of the AVRO messages sent by the simulator in JSON format so you can see them.

corebanking.com.irisdemo.banksim.avroevent.NewCustomerAvroEvent

{
    "accountNumber":"0000002",
    "address": 
    { 
        "city":"York",
        "phone":"2989549246",
        "state":"NJ"
    },
    "customerId":3,
    "eventDate":"2020-06-15T00:15:29.904Z",
    "eventId":2,
    "initialAmount":96862.53125,
    "name":"Oswald Newman"
}

A message like this can be seen by starting the simulator and then, after waiting for a couple of seconds, you can click here:

http://127.0.0.1:10001/csp/appint/rest/data/corebanking.com.irisdemo.banksim.avroevent.NewCustomerAvroEvent/2

corebanking.com.irisdemo.banksim.avroevent.DemographicsAvroEvent

{
    "address":
    {
        "city":"New Boston",
        "phone":"5939642280",
        "state":"OR"
    },
    "customerId":43693,
    "eventDate":"2020-06-15T00:15:31.904Z",
    "eventId":50003,
    "name":"Yusef Lopez"
}

corebanking_com_irisdemo_banksim_avroevent.LoanContractAvroEvent

{
    "account":"0036270",
    "amount":4778,
    "contractId":100001,
    "customerId":72539,
    "eventDate":"2020-06-15T00:15:32.904Z",
    "eventId":50004
}

corebanking_com_irisdemo_banksim_avroevent.TransferAvroEvent

{
    "amount":-43294.621410429550451,
    "customerAccount":"0007064",
    "customerId":14127,
    "eventDate":"2020-06-15T00:15:30.904Z",
    "eventId":50001,
    "otherAccount":"0031793",
    "reference":"",
    "transferType":"TRANSFER_OUT"
}

How does the Demo uses AVRO and AVRO Schemas

A very good introduction to AVRO can be found here and a very nice introduction to its schema language can be found here.

Many financial services organizations are using Kafka to ship data from one system to another and also to store data indefinitely for when a system needs it. Kafka does not define the structure of its messages. They can be as simple as a string or an integer or have a custom format defined by the developer and shipped as a byte array. But instead of coming up with yet another serialization format, companies are adopting standard serializtion formats such as AVRO and Protobuf to share data among applications.

The advantage of using AVRO or Protobuf is that they provide with a standard way of defining a shared schema for messages. In the case of AVRO, the schemas are defined using JSON which is very easy to use and read for both humans and machines. Programing languages such as Java, Python, C, etc. can then use these schemas to generate data structures such as classes that programmers can use to parse and produce messages.

So, while Kafka is the medium and it is agnostic of the payloads that it is delivering, AVRO provides for a standard serialization format so any programming language can now communicate effictively over Kafka using their own data structures.

On this demo, we implemented enough of the AVRO schema language to be able to generate InterSystems IRIS classes/tables to hold the data. The Schema Registry Service will periodically pull the AVRO schemas registered there and build InterSystems IRIS classes to hold the messages sent over Kafka.

You can click on the Schema Registry image at the demo landing page or just open the following URL to see the schemas that are registered there:

http://localhost:8081/subjects/

If you don't see anything when opening this URL, make sure the demo is running and that you have run the bank simulator at least once and let at least 100K messages be sent to Kafka. On this demo, the schemas will be registered as messages are sent. On a typical production system, that would not happen. The Schema Registry dictates what schemas can be used by message producers and will block a producer to send a message for which there is no schema registered at the schema registry.

When you see a schema under the URL above, you can also ask for its versions:

http://localhost:8081/subjects/com.irisdemo.banksim.avroevent.NewCustomerAvroEvent/versions/

This will probably return an array with a single value (version 1). That is because we are not playing with multiple versions of a schema on this demo. We will make this in future iterations of this demo. But you can keep going and ask to see a specific version of a schema:

http://localhost:8081/subjects/com.irisdemo.banksim.avroevent.NewCustomerAvroEvent/versions/1

Here is an example:

{
    "subject":"com.irisdemo.banksim.avroevent.NewCustomerAvroEvent",
    "version":1,
    "id":1,
    "schema":"{\"type\":\"record\",\"name\":\"NewCustomerAvroEvent\",\"namespace\":\"com.irisdemo.banksim.avroevent\",\"fields\":[{\"name\":\"eventId\",\"type\":\"long\"},{\"name\":\"eventDate\",\"type\":\"string\",\"logicalType\":\"timestamp-millis\"},{\"name\":\"customerId\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"accountNumber\",\"type\":\"string\"},{\"name\":\"initialAmount\",\"type\":\"double\"},{\"name\":\"address\",\"type\":{\"type\":\"record\",\"name\":\"mailing_address\",\"fields\":[{\"name\":\"state\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"phone\",\"type\":\"string\"}]}}]}"
}

This is not a pure AVRO schema. The AVRO schema is actually under the schema attribute as a string. This is just a schema registry structure that holds the schema and its metadata such as its id and version. Here is the actuall AVRO schema:

{
    "type":"record",
    "name":"NewCustomerAvroEvent",
    "namespace":"com.irisdemo.banksim.avroevent",
    "fields":
    [
        {"name":"eventId","type":"long"},
        {"name":"eventDate","type":"string","logicalType":"timestamp-millis"},
        {"name":"customerId","type":"long"},
        {"name":"name","type":"string"},
        {"name":"accountNumber","type":"string"},
        {"name":"initialAmount","type":"double"},
        {"name":"address",
        "type": {
            "type":"record",
            "name":"mailing_address",
            "fields":
                [
                    {"name":"state", "type":"string"},
                    {"name":"city","type":"string"},
                    {"name":"phone","type":"string"}
                ]
            }
        }
    ]
}

This is what the Schema Registry Service fetches to generate the InterSystems IRIS tables/classes to hold the data. This service uses a Schema Registry Configuration that defines the schema registry URL and other aspects. You can query the configuration with SQL:

SELECT * FROM SchemaRegistry.Config
ID DriverName EndPoint Name SubjectNamingStrategy
1 Confluent http://schema-registry:8081 corebanking RecordNameStrategy

Where are the tables that hold the original data?

As explained above, the tables are genereated automatically inside InterSystems IRIS by the Schema Registry Service. This service is deployed on an InterSystems IRIS production and configured with a SchemaRegistry.Config. We only have one config, because we have only one schema registry. This configuration has a name.

The schema registry service generates classes/tables with the following naming convention:

(ConfigName)_(schena namespace).(schema name)

So the schema above will produce a table like this:

corebanking_com_irisdemo_banksim_avroevent.NewCustomerAvroEvent

As we have many avro events coming from the bank simulator, we will end up with many schemas. They are all under the same namespace, so the will be all together under the corebanking_com_irisdemo_banksim_avroevent schema.

Events will be added to their own tables and an numeric autogenerated ID column will provide for a unique identification of each message. The Schema Registry Service will also generate a table called corebanking.AllObjects that you can use to select all the events that have been stored in these tables in the sequence that they were originally stored in in Kafka. This is a very important table for us. We use it to know which message to process next to guaranteee the proper sequence of processing of the messages.

What does the Schema Normalization Process does?

There is another service on the InterSystems IRIS production called Schema Normalization Service that keeps pulling messages from table corebanking.AllObjects and pushing them to the Schema Normalization Process. These messages are pushed synchronously (the service waits for the correct processing of each message) and this is by design, to continue guaranteeing the proper sequencing of messages.

The process will then use the message schema full name to query table SchemaNormalization.SchemaKeyMapConfig which will tell the process what is the DTL (data transformation language) class to be used to that particular message to normalized it to the canonical schema. This is another very important table to be aware of.

The Schema Normalization Process is a generic process that can be used to normalize any Kafka/AVRO message as long as the messages sent to it have its schema configured in SchemaNormalization.SchemaKeyMapConfig and the proper DTLs are created and available to the process to use.

If a problem occurs during the normalization process, the entire processing will temporarily pause and an alert will be sent to the application specialist role (a data steward). A workflow ticket will also be created to make sure the proper timing of resulution is measured. This person will be able to see the problem, use InterSystems IRIS visual trace capabilities to understand what is happening, fix the issue and decide if this message should be retried or discarded. The processing of messages will then resume.

Temporarily pausing the processing of messages is paramount for proper message sequencing and to gurantee data quality. Other more advanced options can be implemented such as pausing all processing just for messages involved the same customer that was affected.

Where is the canonical model

As explained, the Schema Normalization Process will pick the right DTL transformations to structurally and semantically normalize messages coming from Kafka to the canonical model that we are making available for users.

If you click on the gren square at the right on the demo landing page or at the SQL arrow at the far right, you will be taken to a place where you can run SQL queries like:

select * from Canonical.Customer

If you run this query before starting the simulator, this table will be empty. If you run it after, the table will show the canonical records.

Here are other queries you can run after you start sending messages to InterSystems IRIS:

How many messages have InterSystems IRIS received and what are their status

SELECT 
 %ProcessingStatus, count(ID) 
FROM corebanking.AllObjects
group by  %ProcessingStatus 

Find customers that have many bank transactions

select Account->AccountNumber, count(ID) as movements
from Canonical.CheckingAccountMov
group by Account->AccountNumber
order by movements desc

Find customers that have many bank transactions and has loans

select Account->AccountNumber, count(ID) as movements
from Canonical.CheckingAccountMov
where LoanContract is not null
group by Account->AccountNumber
order by movements desc

Get a Customer's checking account rolling balance

select Account->Customer->FullName, Account->AccountNumber, Account->OpeningBalance, MovementDate, MovementType, LoanContract, Reference, TransferId, Amount, RollingBalance
from Canonical.CheckingAccountMov 
where Account->AccountNumber= '0000107' 
order by MovementDate

Please notice that this query shows a very useful SQL extension provided by InterSystems IRIS. This could have been a very complex query, joining three different tables (CheckingAccountMov, Customer and CheckingAccount). Instead, we can focus on the CheckingAccountMov table and just use the arrow syntach (->) to navigate to the other tables and write a much simpler query.

Human Workflow

The work of transforming the original documents to its canonical form is orchestrated by the Schema Normalization Process. If the message can not be transformed into the canonical format, the normalization process will start a new workflow task so that an application specialist can intervene.

Until the application specialist intervene, the processing of messages is interrupted in order to guarantee the integrity of the canonical model.

A simple way to cause a problem is to remove a country from the lookup table and watch the workflow inbox for a ticket to appear.

You will be able to pick a ticket for yourself. Then you can find the problematic message trace and look at what happend. Then you can add that country back to the lookup table and use the workflow UI to retry processing the message. You will see that, this time, the message will go through. The message trace will show the entire history of the event. This can be stored for many months and years for forensics.

Other demo applications

There are other IRIS demo applications that touch different subjects such as NLP, ML, Integration with AWS services, Twitter services, performance benchmarks etc. Here are some of them:

  • HTAP Demo - Hybrid Transaction-Analytical Processing benchmark. See how fast IRIS can insert and query at the same time. You will notice it is up to 20x faster than AWS Aurora!
  • Twitter Sentiment Analysis - Shows how IRIS can be used to consume Tweets in realtime and use its NLP (natural language processing) and business rules capabilities to evaluate the tweet's sentiment and the metadata to make decisions on when to contact someone to offer support.
  • HL7 Appointments and SMS (text messages) application - Shows how IRIS for Health can be used to parse HL7 appointment messages to send SMS (text messages) appointment reminders to patients. It also shows real time dashboards based on appointments data stored in a normalized data lake.
  • The Readmission Demo - Patient Readmissions are said to be the "Hello World of Machine Learning" in Healthcare. On this demo, we use this problem to show how IRIS can be used to safely build and operationalize ML models for real time predictions and how this can be integrated into a random application. This IRIS for Health demo seeks to show how a full solution for this problem can be built.
  • Fraud Prevention - Apply Machine Learning and Business Rules to prevent frauds in financial services transactions using InterSystems IRIS.
  • Financial Transactions with Fraud and Rewards/cross-sell - Process credit card transactions while keeping a hot data lake current with data aggregated from your core systems. Use this aggregated data to prevent frauds and verify customer elegibility for for rewards as an example of cross-selling.

Report any Issues

Please, report any issues on the Issues section.

Category
Technology Example
Works with
InterSystems IRIS
Tags
Info
Version
1.0.0
Last updated
2020-08-27
Repository
Open
Documentation
Open
License
Link