While a distributed data mesh empowers teams in a company to securely build modern applications as they reduce data dependency, it also poses challenges for non-product teams. Certain teams within a company may require access to anonymous and denormalized data to further grow the business. In this post, we will take a look at how such teams can use DeltaStream to capture the data they need to do their work, while the data owners control the security of their data.

Training Machine Learning Models

For the purpose of this exercise, let’s assume a Machine Learning team needs access to anonymous user data for building models to reduce fraud in a financial institution based on frequency and location of payments made to an account. This team stores their data in a topic in an Apache Kafka cluster that is declared as a Store in DeltaStream:

  1. mldb.product/mlai_msk# LIST STORES;
  2. Name | Kind | Access Region | Metadata | Owner | Created at | Updated at
  3. ---------------+---------+---------------+----------+----------+----------------------+-----------------------
  4. mlai_msk | Kafka | AWS us-east-1 | {} | sysadmin | 2023-01-12T20:38:16Z | 2023-01-12T20:38:16Z

and we already have access to the payments made by `payerid` over time:

  1. CREATE STREAM payments_log (
  2. paymenttime BIGINT,
  3. payerid VARCHAR,
  4. accountid VARCHAR,
  5. paymentid VARCHAR
  6. ) WITH (‘topic’=’topic_name’, 'value.format'='JSON');

DDL 1: `payments_log` definition for the `payments_log` Kafka topic

`DDL 1` defines the running log of each payment through the product that was created using the `CREATE STREAM` statement. The `payments_log` references the `accountId` that is the recipient of each payment, and `paymentId` that includes extra payment information.

In addition to frequency of payments made to a specific `accountid`, we also need to take into account the location that payments are being made from so the training model can better detect anomalies over time. We will expand on this in the next section.

Sharing Anonymous User Data

As the stream of payments are provided in the `payments_log` Stream above, we need to securely denormalize the `payerid` field to also include where the payments are coming from without exposing users’ sensitive information. This can be done by the team that owns the additional payer information, identified by a `userid` and described by the following Changelog in the `userdb.product` Schema:

  1. CREATE CHANGELOG userdb.product.users_log (
  2. registertime BIGINT,
  3. userid VARCHAR,
  4. regionid VARCHAR,
  5. contactinfo STRUCT<email VARCHAR, phone VARCHAR, city VARCHAR, country VARCHAR>,
  6. PRIMARY KEY(userid)
  7. ) WITH ('topic'='users', 'value.format'='json');

For simplicity, let’s assume all payers are registered as a user with the product. At this point, only the users team has access to the `userdb` Database, hence the `users_log` is not accessible by the Machine Learning team, for data security reasons. The users team is the owner of the `payments_log` data, so they have usage permissions and can read from and write to the Stream.

Using the following query, we can provide the anonymous user information to the Machine Learning team in real time:

  1. CREATE STREAM payments_location
  2. AS SELECT
  3. p.paymenttime AS paytime,
  4. u.registertime AS payer_register_time,
  5. u.regionid AS region,
  6. contactinfo->city AS payment_city,
  7. contactinfo->country AS payment_country,
  8. p.accountid AS payee,
  9. p.paymentid AS paymentid
  10. FROM payments p
  11. JOIN users_log u ON u.userid = p.payerid;

Query 1: Enrich payments with anonymous payer location info with a temporal join on `users_log.userid`

In `Query 1`, we are looking up the payer represented by `payerid` in the `users_log` Changelog identified by `userid`, and while doing that, we omitted `userid`, `contactinfo.email`, and `contactinfo.phone` as they were identified as Personally Identifiable Information (PII) by the users team, prevented this data from leaking outside of the `userdb` Database.

As a result of `Query 1`, a new `payments_location` Stream is created that provides the location information for each payment made to an account in addition to the existing payment information:

  1. CREATE STREAM payments_location (
  2. paytime BIGINT,
  3. payer_register_time BIGINT,
  4. region VARCHAR,
  5. payment_city VARCHAR,
  6. payment_country VARCHAR,
  7. payee VARCHAR,
  8. paymentid VARCHAR
  9. ) WITH (‘topic’=’topicname’, 'value.format'='json');

DDL 2: Underlying DDL for the denormalized `payments_location` in `Query 1`

`DDL 2` statement reveals how `payments_location` Stream was created when `Query 1` was launched.

Model Training with Real-Time Data

Now, let’s assume that additional payment information can be provided by the `paymentid` field, and by inspecting the `payments` Stream, the `chargeinfo` structure can be very useful to our fraud detection model:

  1. CREATE STREAM payments (
  2. id VARCHAR,
  3. chargeinfo STRUCT<vcc VARCHAR, amount FLOAT, type VARCHAR>,
  4. payer VARCHAR,
  5. payee VARCHAR,
  6. paymenttime BIGINT
  7. ) WITH (‘topic’=’topicname’, 'value.format'='json');

Using the `payments` DDL, the following query can be created to continuously provide the additional charge information to the ML team:

  1. CREATE STREAM payments_full
  2. AS SELECT
  3. pl.paytime AS paytime,
  4. pl.payer_register_time AS payer_register_time,
  5. pl.region AS region,
  6. pl.payment_city AS payment_city,
  7. pl.payment_country AS payment_country,
  8. pl.payee AS payee,
  9. p.chargeinfo AS charge
  10. FROM payments_location pl
  11. JOIN payments p ON p.id = pl.paymentid;

Query 2: Denormalize payment ID into charge information

In `Query 2`, we directly replaced the `paymentid` reference with the charge information to allow the model training pipeline to get the full picture for finding payment anomalies that may be occurring within our product. As a result, the `payments_full` Stream is created as such:

  1. CREATE STREAM payments_location (
  2. paytime BIGINT,
  3. payer_register_time BIGINT,
  4. region VARCHAR,
  5. payment_city VARCHAR,
  6. payment_country VARCHAR,
  7. payee VARCHAR,
  8. charge STRUCT<vcc VARCHAR, amount FLOAT, type VARCHAR>
  9. ) WITH (‘topic’=’topicname’, 'value.format'='json');

In addition to providing the right information to the model training pipeline, the pipeline is receiving this information in real-time where it can evolve faster over time, positively impacting the business.

What’s next?

In this post, we looked at some of the techniques that can be used alongside modern products today to securely denormalize data that may be useful to other teams within the company without access to the original data. While this may be an oversimplification of the scenario, we have extensive support for different data types and data processing operations that fit endless production cases. Please refer to our developer documentation to learn more about how your scenario can be simplified using DeltaStream.

If you are using streaming storage systems such as Apache Kafka (Confluent Cloud, AWS MSK, Redpanda or any other Apache Kafka) or AWS Kinesis, you should check out DeltaStream as the platform for processing, organizing and securing your streaming data. You can schedule a demo where you can see all these capabilities in the context of a real world streaming application.