Version:0.9
StartHTML:0000000105
EndHTML:0000012306
StartFragment:0000000141
EndFragment:0000012266
1. Producing the data (10%)
In this task, we will implement one Apache Kafka producer to simulate the real-time data
transfer from one repository to another.
Important:
- Do not use Spark in this task
- In this part, all columns should be string type
Your program should send a random number (10~30, including 10 and 30) of client data
every 5 seconds to the Kafka stream in 2 different topics based on their origin files.
- For example, if the first random batch of customers' IDs is 1,2, and 3, you should also
send bureau data of them to the bureau topic. For every batch of data, you need to
add a new column 'ts', the current timestamp. The data in the same batch shouldhave the same timestamp.
For instance: batch1: [{ID=xxx,...,ts=123456}, {ID=xxx,...,ts=123456},......]
↑
one row
Save your code in Assignment-2B-Task1_producer.ipynb.
2. Streaming application using Spark Structured Streaming (55%)
In this task, we will implement Spark Structured Streaming to consume the data from task 1
and perform predictive analytics.
Important:
- In this task, use PySpark Structured Streaming together with PySpark
Dataframe APIs and PySpark ML
- You are also provided with a pre-trained pipeline model for predicting the
top-up customers. Information on the required inputs of the pipeline model can
be found in the Background section.
1. Write code to SparkSession is created using a SparkConf object, which would use
two local cores with a proper application name, and use UTC as the timezone.
2. Use the same topic names from the Kafka producer in Task 1, ingest the streaming
data into Spark Streaming and assume all data coming in String format.
3. Then the streaming data format should be transformed into the proper formats
following the metadata file schema, similar to assignment 2A. Then use 'ts' column
as the watermark and set the delay threshold to 5 seconds.
4. Group the bureau stream based on ID with 30 seconds window duration, similar to
assignment 2A(same rule for sum and dist).
- Transform the “SELF-INDICATOR” column’s values. If the value is true, then
convert to 1, if the value is false, then convert to 0.
- sum the rows for numeric type columns, count distinct values for other columns
with other data types, and rename them with the postfix like '_sum' or '_dist'. (For
example, we did the sum function based on the 'HIGH CREDIT', and the new
column’s name will be 'HIGH CREDIT_sum').
5. Create new columns named 'window_start' and 'window_end' which are the window’s
start time and end time in 2.4. Then inner join the 2 streams based on 'ID', and only
customer data received between the window time are accepted.
For example, customer data ID '3' received at 10:00, and only when the window of
corresponding bureau data contains 10:00(like window start: 9:59, end: 10:00), then
this data is accepted.
6. Persist the above result in parquet format.(When you save the data to parquet
format,you need to rename “Top-up Month” to “Top-up_Month” first. And only keep
these columns “ID”, “window_start”, “window_end”, “ts”, “Top-up_Month”) Renaming
“Top-up Month” only happen in this question
7. Load the machine learning models given and use the model to predict whether users
will be joining the top-up service. Save the results in parquet format. (When you save
the data to parquet format,you need to rename “Top-up Month” to “Top-up_Month”
first. And only keep these columns “ID”, “window_start”, “window_end”, “ts”,
“prediction”, “Top-up_Month”) Renaming “Top-up Month” will happen in this
question as well8. Only keep the customer predicted as our target customers (willing to join the top-up
service). Normally, we should only keep “Top-up=1”. But due to the limited
performance of our VM, if your process is extremely slow, you can abandon the filter
and keep all of the data. Then for each batch, show the epoch id and count of the
dataframe.
If the dataframe is not empty, transform the data to the following key/value format,
which key is 'window_end' column and the data are the numbers of top-up services
customers in the different states(in JSON format). Then send it to Kafka with a proper
topic. These data will be used for the real-time monitoring in task 3.