CARLETON UNIVERSITY
Department of Systems and Computer Engineering
SYSC 5207 Distributed Systems Engineering (Fall 2021)
Course Project
[Please indicate your name and student no. on the cover page and each page of the
document. Start Part II on a new page.]
PART- I
1. In this question you will do a literature survey based on the two papers (attached)
that focus on processing of documents concerning large volumes of text data:
1. Chanda, B., and Majumdar, S., “Filtering and Storing User-preferred Data: an Apache
Spark Based Approach,” IEEE Intl Conf on Dependable, Autonomic and Secure Computing,
Intl Conf on Pervasive Intelligence and Computing, Intl Conf on Cloud and Big Data
Computing, Intl Conf on Cyber Science and Technology Congress, Online, August 2020, pp.
679-685.
2. Chanda, B., and Majumdar, S., “A Technique for Extracting User Specified Information
from Streaming Data”, Proc. International Symposium on Performance Evaluation of
Computer and Telecommunication Systems (SPECTS), Online, July 2021.
Prepare a report (at most 2 pages long) that outlines the important issues discussed in the
papers. Your literature survey should include: (a) a critical appreciation of the research
reported in the papers; (b) a comparison of the different important resource management
techniques proposed by the authors that are described in these papers. Also, report any
insights gained into system behaviour and performance from the papers.
NOTE: The report must be type written. The length of this report is strictly limited to 2
pages (double spacing, a font size of 12 and a 1 inch border on all sides). Material
beyond this page limit will not be read.
PART II
2. Consider four identical parallel applications each of which comprises four independent
processes with execution times: T, 2T, 3T and 4T. Consider two cases. In case 1, the four
applications are run in sequence. The first application is run first. When it completes it is
followed by the second application and so on. Preemption of processes is not allowed
and processor allocation is done in such a way that the minimum completion time is
achieved for each application. In case 2, the four applications are run simultaneously and
the Processor Sharing scheduling discipline (that was discussed in class) is used. Note
that in this case the processors are shared equally among all the ready processes in all the
applications. Consider a system with two processors (CPUs):
(a) What is the mean job turnaround time for case 1?
(b) What is the mean job turnaround time for case?
Which of these two cases (case 1 or case 2) will give rise to the smaller mean job
turnaround time?
Note that the turnaround time of a job includes both its waiting and execution times.
Justify your answer. Show all your work.
NOTE: Please start Part II on a separate page. Please submit a PDF file
Due: December 6 (6:00 PM).
Filtering and Storing User Preferred Data: an Apache Spark Based Approach
Bannya Chanda
Dept of Systems and Computer Engineering
Carleton University
Ottawa, Canada
e-mail: bannya.chanda@carleton.ca
Shikharesh Majumdar
Dept of Systems and Computer Engineering
Carleton University
Ottawa, Canada
e-mail: majumdar@sce.carleton.ca
Abstract— This work-in-progress paper focuses on a filtering
technique based on user preferences. It uses parallel processing
and machine learning to effectively filter out user preferred
data from a large raw data set. Although large volumes of data
are generated, a user is often interested in only a select type
(classes) of such data. The motivation behind this research is to
devise an effective and efficient filtering technique for
extracting user preferred data from large data sets. Storing
only filtered data and discarding the remaining data can
decrease latency in searching for specific information within a
data set. It can also decrease the size of the storage required
for storing these data. Such a filtering method that uses data
classification techniques can give rise to high processing
latencies. An algorithm and system that use both parallel
processing and machine learning are presented. A proof-of-
concept prototype is built on the Apache Spark parallel
processing platform. Analysis of the results of preliminary
experiments demonstrates the viability of the investigated
technique.
Keywords-component Apache Spark, parallel processing, text
classification, preference-based filtering
I. INTRODUCTION
Text-based data are continuously increasing in
importance. From education to business to personal use, text-
based data have become an essential and integral part of
modern society. Users are reliant on text data such as articles
in newspapers or journals, meeting minutes, and tweets.
Furthermore, actions in their daily lives are often dependent
on the accuracy of extracting information that is useful to
them from this text-data set. With this comes the challenge
of searching this data set for the required information. This
becomes even more difficult when the data set is large
because the required information becomes more time
consuming to find. This paper focuses on a technique for
filtering raw data sets and storing only user “preferred” data
as filtered data specified by a user. The user can search from
these filtered data. This will reduce both search latency as
well as the volume of data that needs to be stored for a given
user.
A user is often interested in specific topics and keywords.
These may include specific names such as names of persons,
places, and medicines or generic classes such as dates and
sporting events. Consider an example use case in which raw
data are comprised of all meeting conversations (converted
into text) or meeting minutes recorded directly as text. An
employee (user) may be interested in specific names of
colleagues, clients, products or all such keywords that fit a
general “name” class and in dates. If one can store only such
keywords and discard the rest of the raw data, it will not only
save space but will also speed up the search of the data based
on specific user queries.
When extracting preferred information from a raw data
set, filtering operations can consume a significant amount of
time and can benefit from the speedup provided by a parallel
processing platform. Hadoop//MapReduce is a popular
platform for processing large volumes of data [1]. One
drawback of this system is that it stores intermediate results
in disks at each processing iteration [2]. Apache Spark
alleviates this problem by avoiding disks and performing in-
memory computation [2]. Spark eliminates expensive
intermediate disk writes via a data model called resilient
distributed datasets (RDDs) that can be stored in the memory
and partitioned across nodes for parallel computations [3].
Thus, by avoiding disk operations, Spark enhances the
performance of applications.
This work-in-progress paper proposes a technique for
filtering and storing user preferred information from the raw
data on the Apache Spark platform. This method leverages
the parallel processing framework and the machine learning
library provided by Spark to filter the raw data set based on
user preferences and stores the filtered data to make them
available for the user. A user can query the filtered data that
contain only the user preferred information. This
information, for example, may include data related to dates,
names of persons, and products. If, for example, the user
queries the filtered data set to retrieve the name of a person,
this search method will return only the data related to the
person’s name.
This short paper reports on an initial investigation of this
filtering technique. To the best of our knowledge, little work
exists on the use of parallel processing and machine learning
for filtering out user preferred data from a large raw data set.
The key contributions of this paper include the following:
x An effective filtering technique using Apache Spark.
x A preliminary proof-of-concept prototype for the
technique.
x Demonstration of the viability of the technique and
the speedup in computation through initial
experiments performed on the prototype.
673
2020 IEEE Intl Conf on Dependable, Autonomic and Secure Computing, Intl Conf on Pervasive Intelligence and Computing,
Intl Conf on Cloud and Big Data Computing, Intl Conf on Cyber Science and Technology Congress
978-1-7281-6609-4/20/$31.00 ©2020 IEEE
DOI 10.1109/DASC-PICom-CBDCom-CyberSciTech49142.2020.00115
The rest of this paper is organized as follows: Section II
presents a key set of related works. Section III briefly
explains the methodology and preliminary implementation of
the proposed solution. Preliminary experiments and results
are described in section IV. Section V concludes the paper
and outlines the next steps for our future work.
II. RELATED WORK
A representative set of related works that concern the use
of machine learning and parallel systems in text-based
processing is presented. The use of machine learning in text
classification has started receiving attention from
researchers. In [6], the authors used a filtered classifier on
text documents that filtered the text document and processed
data using a decision tree classifier to classify the text
documents. The usage of single-layer multisize filters
convolutional neural network for the classification of Urdu
documents is discussed in [7]. In [8], a model is proposed to
perform deep sentiment analysis for documents based on a
convolutional neural network and a long short-term memory
network. Some research works are directed at using parallel
algorithms for text classification and learning user
preferences. Examples include [9], which used the
MapReduce-based Rocchio relevance feedback algorithm for
document classification. In [10], a MapReduce-based vector
space model is used for user profiling using the news items
read by the user. Several studies are based on preference-
based search. In [11], the authors used an efficient Bayesian
filter based on a group of users’ past searching behaviors to
refine the search results by filtering out items for the user. In
[12], the authors presented an integrated technique of user-
filtering and query-refinement based on user preferences to
retrieve user-preferred music. Research has been conducted
using the machine learning library of Apache Spark. In [19],
the authors evaluated the performance of logistic regression,
decision tree and Support Vector Machine (SVM) in the
design of the Apache Spark-based Intrusion Detection
System (IDS). In [20], the authors developed five models
using distributed machine learning based on Apache Spark
for predicting diabetes, and among those models, the
Logistic Regression Classifier achieved the highest accuracy.
In [21], the authors evaluated four machine learning libraries
of Apache Spark to provide an effective solution for the
prediction of heart diseases.
To the best of our knowledge, none of the existing works
have used parallel processing and machine learning for
filtering the raw data set based on user preferences.
III. PROPOSED TECHNIQUE
A. Methodology
The Proposed technique is explained with the help of fig.
1. Each component of the system presented in the figure is
described is described in the following:
x Raw Data: The raw data may be comprised of one or
multiple text files. The proposed filtering technique
filters the raw text data set based on the user
preferences.
x User Preferences: The user provides preferences for
the filtering technique. These preferences are
comprised of a set of keywords that describe a topic
the user is interested in (e.g. dates, names, and
products). The filtering technique uses the user
preferences to filter the raw data and store only the
output of this module for user queries.
x Auto Generation of User Preferences: This optional
module uses machine learning techniques to
generate user preferences using a history of previous
queries sent by the user.
x Filtered Data: These are the data obtained after
running the filtering algorithm on the raw data set.
The data related to the user-specified keywords are
retained in a file.
x User Queries: A user can query the filtered data
module to get related data by providing keywords or
a sentence.
x Application: A program that processes the filtered
data for a specific purpose. For example, an
application for meeting dates is being devised to
generate a meeting schedule for the user based on
the data related to dates recorded in the filtered data
module for a period specified by the user.
B. Apache Spark Architecture
Apache Spark uses a master/slave architecture containing
one master node and multiple worker nodes [3] (see fig. 2).
The master node is called the driver. The driver
communicates with the worker nodes, which are referred to
as executors. The user application code is written in the
driver, which is responsible for creating a SparkContext,
Resilient distributed data sets (RDDs), and executing all
transformations and actions [3]. A Spark application is
launched on a set of clusters with the help of a resource
manager. The Standalone cluster manager is the default
built-in resource manager of Spark [3]. With this cluster
manager, an application can get all the cores available in the
cluster by default [3]. In this work-in-progress paper, a
standalone cluster manager has been used.
SparkContext is the main entry point for Apache Spark
[3]. It allows the application to access the spark cluster with
Figure 1. Proposed technique
674
the help of the resource manager and invoke functions. An
RDD is the main data structure for Apache Spark and
consists of a distributed collection of objects [3]. An RDD
derives new RDDs from the existing RDDs when the
application executes Spark transformation functions [3]. Two
basic transformations are map() and filter(). The Spark map()
transformation takes in any function and applies that function
to every element of an RDD. The Spark filter()
transformation returns a new RDD that contains only the
data satisfying the filter conditions. Spark transformations
are lazy. Spark does not start the execution of the
transformations immediately. Transformations get executed
upon an action. An action is one of the ways of sending data
from executors to the driver. Examples of Spark actions
include collect(), reduce(), take(n) and foreach(). The action
collect() is a common operation that returns the content of
the new RDD to the driver. The action reduce() aggregates
the RDD contents using a function. When any of the actions
are triggered, the Spark driver creates an execution plan or
jobs for the application [3]. A Spark job consists of multiple
tasks that get initiated in response to Spark actions. The
driver schedules these tasks to run on the executors. The task
scheduler resides in the driver and distributes tasks across the
cluster to be executed by executors on their partitions of the
RDD. Then, results are sent back to the driver program for
compilation.
The other key subsidiary components included in the
Spark core architecture are Spark SQL, Spark Streaming,
and Spark MLlib libraries [3]. Spark SQL provides a
structured data-processing abstraction called DataFrames,
which are essentially distributed collections of data
organized into columns. Spark Streaming allows the
processing of stream data in real-time. Spark MLlib contains
common machine learning algorithms implemented as Spark
operations performed on RDDs. Apache Spark’s MLlib
library contains different algorithms for classification. In this
paper, three different types of classification algorithms –
Multinomial Naïve Bayes, Multinomial Logistic Regression
and Multilayer Perceptron Classifier of Spark MLlib have
been trained with the text classification dataset. Multinomial
Naïve Bayes is a multiclass classification algorithm which
uses the multinomial distribution for each feature and
computes the conditional probability distribution of the label
for given features by applying Bayes’ theorem and uses it for
prediction [22]. Multinomial Logistic Regression is a
supervised classification algorithm that predicts a multiclass
outcome by explaining the relationship between the class and
the extracted features from the input [14]. Multilayer
Perceptron Classifier is a feedforward artificial neural
network which is combined of an input layer for input data,
an output layer to provide prediction about the input data and
an arbitrary number of hidden layers in between the two
layers for computation [23]. Among them, Multinomial
Logistic Regression demonstrates the highest accuracy
(discussed further in section IV-C) and is used in this paper.
Apache Spark MLlib also offers a set of multi-language APIs
for machine learning algorithms. Among them, Pipeline [3]
and K-folds Cross-Validation [3] are used in this research.
Pipeline is a high-level API that consists of a sequence of
algorithms to be run in a specific order to process data. K-
folds Cross-Validation is a re-sampling procedure used to
assess and test the efficiency of a machine learning model if
the input data size is small [16]. It splits the training dataset
into K folds that are used as separate training and test
datasets to fit and validate the machine learning model. It
finds the best model by running the machine learning
algorithms with different combinations of parameters and
training and testing datasets.
C. Preliminary Prototype Implementation
A preliminary proof-of-concept prototype is
implemented on an Apache Spark cluster. The program is
written in python and the PySpark API [3] is used. The
prototype incorporates two methods: 1) the filter method
which filters the raw data based on user preferences and 2)
the search method, which uses keywords or sentences
specified in a user query for retrieving searched data from
filtered data.
1) Filter method: The filter method categorizes the raw
data using the machine learning model and named entity
recognition. Named entity recognition is used to extract
specific entities such as person names, locations, dates,
numbers, and money. A Machine learning model is used to
classify the raw text data according to its content. The
categorized data are then filtered based on user preferences.
Consider, for example, the raw data that contains the
following text lines,
Line 1: “Don Deis and Mike Starek are representing the
Faculty Senate.”
Line 2: “Faculty Senate Speakers are invited to
participate.”
After applying the filter method on the raw data, the
named entity recognition function extracts the person names
“Don Deis” and “Mike Starek” from line 1 and classifies
line 1 as “Person” (class), and the machine learning model
also classifies it as “Person”. But in line 2, there is no
relevant entity e.g. person name, location, date, etc., so the
named entity recognition function cannot extract any entities
and classify this line. So, in the case of line 2, the machine
learning model classifies it as “Person”. If the user
preferences include “Person”, the filter method stores these
two lines. Each line of filtered data is comma-separated with
the filtered text, the entity class is recognized by the named
entity recognition function, extracted name entities, and
Figure 2. Apache Spark cluster
675
class predicted by the machine learning model. Therefore,
the filtered data (shown in fig. 1) will contain the following
comma-separated lines,
Line 1: “Don Deis and Mike Starek are representing the
Faculty Senate.”, “Person”, “Don Deis, Mike Starek”,
“Person”
Line 2: “Faculty Senate Speakers are invited to
participate.”, “Person”
Fig. 3 describes the filter method component of the
prototype. First, the texts from raw data set are preprocessed
by eliminating punctuation marks and making the uppercase
letters lower case in the “Text preprocessing” function. The
preprocessed data are passed as input to the named entity
recognition function and machine learning model. A pre-
trained python library “spaCy” [13] is used to extract named
entities. A Multinomial Logistic Regression (discussed in
section III-B) classifier is used to classify the text according
to its content. This classifier has been formed using the
training dataset obtained from the Cognitive Computation
Group of University of Illinois [4][5] (discussed in section
IV). The classifier runs a sequence of pipeline stages (see fig.
3) including tokenization and stop words removal,
converting each word into feature vectors, encoding labels to
label indices and learn a logistic regression model from
feature vectors and labels. Using Spark’s 5-folds Cross-
Validation [3], these pipeline stages have been run with
different combinations of logistic regression parameters to
find the best model. The best logistic regression model is
used in the filter method to classify the text data. After
classifying the raw data, the filter method filters data whose
class matches with user preferences and stores the filtered
data (see fig. 3).
Pseudocode 1 provides the algorithm of the proposed
filter method. The input of the filter method is user
preferences and the raw data directory where all the raw
data files are located. For example, raw data files can be
composed of multiple meeting-minutes files. The filter
method reads the files from the local file system directory
using a Spark function that creates RDDs in line 1 of the
pseudocode. In lines 2–7, the “map” function processes the
text data from each raw data file. Line 3 divides the text data
into a list of sentences. Line 4 preprocesses each sentence
by making the uppercase letters lower case and eliminating
the punctuation marks. In line 5, the “NER” function
extracts the named entities such as person names, locations,
and dates based on user preferences from each sentence,
whereas line 6 creates row instances where in each row
there are three columns: “Text”, “NERTokens”, and
“NERData” for the sentence, named entities classes, and the
extracted names entities respectively. Line 7 performs the
“collect” action to get all the data after performing the map
function. Line 8 creates a DataFrame with the three
columns. To classify the sentences according to their
content, this DataFrame is passed through the stages of the
machine learning model, and the model returns each row
with another additional column “label” for the predicted
class (see lines 9–10). Lines 11–14 store sentences based on
user preferences input by iterating through each row and the
predicted class from the machine learning model and named
entities classes are compared with the user preferences.
2) Search Method: The search method looks within the
filtered data module for the data that match the user-
provided keywords or sentence. Consider the example
filtered data presented earlier. Recall the contents of the two
lines:
Line 1: “Don Deis and Mike Starek are representing the
Faculty Senate.”, “Person”, “Don Deis, Mike
Starek”, “Person”
Line 2: “Faculty Senate Speakers are invited to
participate.”, “Person”
If the user queries: “Is the name Don Deis mentioned in
the data files?”, the search method preprocesses the query
and classifies it as “Person” and extracts the named entity
“Don Deis”. Then, the search method looks for that class
and named entity in the filtered data and displays line 1:
“Don Deis and Mike Starek are representing the Faculty
Senate.” as the results to the user. If the user searches for the
keyword “Senate”, it will display the text contained in the
sentences from both lines 1 and 2.
Fig. 4 shows components of the proof-of-concept
prototype that implements the search method. There are two
ways to retrieve data from filtered data. In the approach
shown in fig. 4a, the user queries the filtered data by
specifying a keyword. The “Text processing” block in the
search method processes the comma-separated lines of
Figure 3. Prototype of the filter method
676
filtered data and returns lists of sentences from the filtered
data. Then, the search method searches for the keyword in
those sentences and displays the relevant search results to
the user. In the second way (see fig. 4b), the user queries the
filtered data by specifying a sentence. The search method
preprocesses the sentence by eliminating punctuation marks
and converts the uppercase letters into lower case letters.
The processed query is passed as an input in the machine
learning model and the named entity recognition function.
The machine learning model discussed in the Filter method
(see section III-C) is used to classify the query and the
named entity recognition function extracts the named
entities. The “Text processing” block in the search method
processes the comma-separated lines of filtered data and
returns lists of sentences and their corresponding classes and
named entities from the filtered data. Then, the class of the
user query, extracted named entities of the user query, and
processed filtered data are passed to a function. This
function of the search method returns relevant sentences
from the filtered data whose classes and named entities
match the class and named entities of the user query. Fig. 4
presents a high-level description of the two search methods.
Further details in terms of pseudocode could not be
provided due to space limitations.
IV. PRELIMINARY EXPERIMENTS AND PERFORMANCE
ANALYSIS
A set of preliminary experiments are performed on two
Apache Spark clusters. The first cluster is set up on a local
computer and is representative of a use case where the user
uses her/his resources to filter meeting files (further details
are provided in the section IV-C). This cluster has been
created using 4 core Intel i5 processors, 4 GB RAM and the
Ubuntu 18.04 operating system. This Spark cluster consists
of one master node and three worker nodes. Some
modifications are applied to the default Apache Spark
configuration [3], which includes one core for each worker
node, 1 GB for executor memory, and 1 GB for driver
memory.
The second cluster is comprised of six cores and 32 GB
RAM running the Ubuntu 20.04 LTS operating system. This
Spark cluster consists of one master node and six worker
nodes. Each worker node has one core and 4 GB RAM, and
the driver memory is set to 30 GB. Such a bigger system is
used when a large number (hundreds/thousands) of text
documents are to be filtered and a local computer does not
have the capability of performing the filtering operations in a
reasonable period. The performance of the proposed filtering
method is evaluated, and sample performance results are
presented for both clusters. Before discussing these results
the datasets used in the experiments are presented.
A. Training Dataset
The training and testing dataset used for the machine
learning algorithm is a labeled text classification dataset
from the Cognitive Computation Group of University of
Illinois [4][5]. This labeled dataset has two attributes: text
and label, where the label attribute specifies the class of the
text’s attribute. In this dataset, there are 50 hierarchical
classes to identify the text semantically. In this paper, this
dataset is utilized with class label attribute values modified
to just 10 labels. Since the primary focus of this research is
on filtering and storing of user-preferred data from a raw
data set using Apache Spark, the classification dataset used
in this paper can be easily replaced by any other relevant text
classification dataset.
B. Raw Data Set
Preliminary experiments were performed on two types
of synthetic text datasets. They are generated from a few
council meetings minutes that have been collected from a
city council website [15]. Each set of meeting minutes is
comprised of over 5000 words. The smaller raw data set is
obtained by replicating the meeting minutes, resulting in 50
raw data files that are stored in a local file system. Each file
contains the minutes of a meeting, and the file size is
approximately 125 KB. The raw data set is thus comprised
of 50 files, and the total raw data size is 6.2 MB. The second
type of data sets concerns large data obtained by copying
the meeting minutes data several times in a single file. Then,
this single file is replicated from 6–57 times. The resulting
total size of the raw data set is 117 MB–1.1 GB. The first
type of data sets is used for the use case that focuses on the
user using a local computer, while the second type of data
sets are processed on the cluster in the cloud. The system
performance achieved with each type of data set and a
variable number of worker nodes is presented next.
C. Preliminary Results
1) Performance: The number of resources in a Spark
cluster and the size of the data being processed are observed
to have a significant impact on performance [18]. This
section focuses on analyzing the impact of the number of
Spark worker nodes and data size on the performance of the
proposed filter method. The prototype for the proposed
filtering technique has been run on the Spark cluster by
Figure 4. Prototype of user query to filtered data: a) search by a
keyword b) search by a sentence
677
changing the number of worker nodes with each worker
node running on an independent core. The computation time
of the filtering technique has been compared for different
numbers of worker nodes and data sizes. The computation
time is the total time required by the filtering method
(application) to complete its processing of the raw data and
the difference between the time at which the application
completes and the time at which it starts execution. The
filtering application has been run in a Spark cluster. As
discussed in section IV, two types of systems are
experimented with using different numbers of Spark worker
nodes and raw data sets.
Fig. 5 shows the computation time required by the
proposed filter method in the first Spark cluster residing on
a local computer. In this experiment, the filter method is run
for different numbers of worker nodes (varies from 1 to 3)
with the small raw data set. As expected, the computation
time of the filter method is observed to decrease by adding
more worker nodes to the cluster. For example, the
computation time is 210 seconds for one worker node,
whereas it decreases to 84 seconds after adding two more
worker nodes to the cluster. Fig. 6 displays the computation
time achieved with different numbers of worker nodes and
different sizes of the raw data set in the second Spark cluster
residing in the cloud. In this experiment, the filter method is
run for different numbers of worker nodes with a large
number of raw data sets at a time. The number of worker
nodes is varied from 1 to 6, and the raw data size is varied
from 117 MB to 1.1 GB accordingly at each run. For a
given data size, the time is observed to decrease with an
increase in the number of worker nodes. This decrease
seems to be more for a higher size of the raw data set. For
example, for a raw data size of 1.1 GB, the completion time
decreases from 10 min (600 seconds) to 4.1 min (246
seconds) (by 59%) as the number of worker nodes changes
from 1 to 6. In both experiments, adding more worker nodes
to the cluster increases the parallelism, so more tasks of the
Spark jobs are processed in parallel, thus decreasing the
computation time of the filter method.
This paper has included preliminary experimental results
that establish the viability of the proposed technique.
Significant work that includes using a higher number of
processing elements and multiple larger data sets is being
planned. The investigation will include the impact of the
system and workload parameters on various additional
performance metrics including speedup and efficiency. The
impact of different machine learning algorithms on
performance and accuracy will be investigated. This paper
has focused on the filtering method. A detailed performance
analysis of the searching method is also part of the plans for
future research. The results of this research will be
presented in our subsequent full research papers.
2) Accuracy: Accuracy is used to measure the
performance of the machine learning model. In this paper,
three different classifiers (discussed in section III-B) have
been trained using the training dataset obtained from the
Cognitive Computation Group of University of Illinois
[4][5] (discussed in section IV). Therefore, the accuracy of
these three different machine learning algorithms has been
assessed based on F-measure. F-measure is the weighted
harmonic mean of the model’s precision and recall [17],
and, as discussed in section III, it is used to measure the
model’s accuracy. Precision is the proportion of positive
results that are truly positive [17]. Recall is the ability of a
model to correctly measure the percentage of actual
positives that are correctly identified [17]. The higher the F-
measure, the higher the accuracy of the model. As shown in
Table 1, the F-measure for Multinomial Logistic Regression
is higher than that for the Naïve Bayes and Multilayer
Figure 6. Computation time for the Filter method vs the number of
worker nodes in the second Spark cluster residing in the cloud
Figure 5. Computaion time for the Filter method vs the number of
worker nodes in the first Spark cluster residing on a local computer
TABLE I. F-MEASURE OF MACHINE LEARNING MODELS
ML Models F-Measure
Multinomial Naïve Bayes 0.74
Multinomial Logistic Regression 0.94
Multilayer Perceptron Classifier 0.44
678
Perceptron classifiers. Therefore, Multinomial Logistic
Regression has been used in the prototype to classify the
raw data set in the filter method.
3) Application: Applications of the filtering method
including a program that uses the user preference “DATE”
to generate a meeting schedule and adds it to the user’s
calendar of events are currently underway.
V. CONCLUSIONS AND FUTURE WORK
Filtering user-preferred information from a large raw
dataset is challenging and time consuming. This work-in-
progress paper describes our preliminary research on an
effective technique for filtering user preferred data. This
technique leverages the parallel processing platform
provided by Apache Spark and its machine learning library.
The preliminary experimental result demonstrates the
viability of the approach for two different use cases and
captures the reduction in filtering latency achieved by the
proposed technique. Initial experiments have focused only
on the computation time achieved.
Directions for further research include the following:
x A detailed performance analysis for a higher number
of processing elements and various combinations of
system and workload parameters
x Experimenting with other machine learning based
classification algorithms for classifying raw data
x Experiments with more raw data sets
These are expected to generate insights (i) into the
relationship between system performance (e.g. speedup) and
workload and system parameters and (ii) into the accuracy of
raw data classification.
x Investigating auto-generation of the user preferences
based on previous queries submitted by the user
ACKNOWLEDGMENT
We are grateful to the Natural Sciences and Engineering
Research Council of Canada (NSERC) for providing the
financial support for this research.
REFERENCES
[1] “Apache Hadoop.” http://hadoop.apache.org/
[2] H. P. H. F. H. Gebara and K. J. Nowka, “Second-generation big data
systems,” Computer, vol. 48, pp. 36–41, Jan 2015.
[3] “Apache Spark.” https://spark.apache.org/
[4] X. Li and D. Roth, “Learning Question Classifiers,” The 19th
International Conference on Computational Linguistics, 2002.
[5] X. Li and D. Roth, “Learning Question Classifiers: The Role of
Semantic Information,” Journal of Natural Language Engineering,
2005.
[6] G. N. Chandrika and E. S. Reddy, "An Efficient Filtered Classifier for
Classification of Unseen Test Data in Text Documents," 2017 IEEE
International Conference on Computational Intelligence and
Computing Research (ICCIC), Coimbatore, 2017, pp. 1-4.
[7] M. P. Akhter, Z. Jiangbin, I. R. Naqvi, M. Abdelmajeed, A.
Mehmood, and M. T. Sadiq, "Document-level Text Classification
using Single-layer Multisize Filters Convolutional Neural Network,"
in IEEE Access.
[8] M. Ghosh and G. Sanyal. 2018, “Document Modeling with
Hierarchical Deep Learning Approach for Sentiment Classification,”
Proceedings of the 2nd International Conference on Digital Signal
Processing (ICDSP 2018), Association for Computing Machinery,
New York, NY, USA, 181–185.
[9] W. Yang, Y. Fu, and D. Zhang, "An Improved Parallel Algorithm for
Text Categorization," 2016 International Symposium on Computer,
Consumer and Control (IS3C), Xi'an, 2016, pp. 451-454.
[10] A. Gautam and P. Bedi, "MR-VSM: Map Reduce based vector Space
Model for user profiling-an empirical study on News data," 2015
International Conference on Advances in Computing,
Communications and Informatics (ICACCI), Kochi, 2015, pp. 355-
360.
[11] J. Zhang and P. Pu, “Refining preference-based search results through
Bayesian filtering,” Proceedings of the 12th international conference
on Intelligent user interfaces (IUI ’07), Association for Computing
Machinery, New York, NY, USA, 294–297.
[12] J. Su, T. Hong, J. Li, and J. Su, "Personalized Content-Based Music
Retrieval by User-Filtering and Query-Refinement," 2018 Conference
on Technologies and Applications of Artificial Intelligence (TAAI),
Taichung, 2018, pp. 177-180
[13] “spaCy”, https://spacy.io/
[14] “Multinomial Logistic Regression”,
https://spark.apache.org/docs/latest/ml-classification-
regression.html#multinomial-logistic-regression
[15] “City Council & Committee Agendas & Minutes”,
https://app06.ottawa.ca/cgi-bin/docs.pl?lang=en
[16] “K-folds Cross-Validation”, https://machinelearningmastery.com/k-
fold-cross-validation/
[17] “F-Measure, Precision, Recall”, https://deepai.org/machine-learning-
glossary-and-terms/f-score
[18] S. Shah, Y. Amannejad, D. Krishnamurthy and M. Wang, "Quick
Execution Time Predictions for Spark Applications," 2019 15th
International Conference on Network and Service Management
(CNSM), Halifax, NS, Canada, 2019, pp. 1-9.
[19] S. V. Siva reddy and S. Saravanan, "Performance Evaluation of
Classification Algorithms in the Design of Apache Spark based
Intrusion Detection System," 2020 5th International Conference on
Communication and Electronics Systems (ICCES), COIMBATORE,
India, 2020, pp. 443-447.
[20] H. Ahmed, E. M. G. Younis and A. A. Ali, "Predicting Diabetes
using Distributed Machine Learning based on Apache Spark*," 2020
International Conference on Innovative Trends in Communication
and Computer Engineering (ITCE), Aswan, Egypt, 2020, pp. 44-49.
[21] A. Ed-Daoudy and K. Maalmi, "Performance evaluation of machine
learning based big data processing framework for prediction of heart
disease," 2019 International Conference on Intelligent Systems and
Advanced Computing Sciences (ISACS), Taza, Morocco, 2019, pp.
1-5.
[22] “Multinomial Naïve Bayes”, https://spark.apache.org/docs/latest/ml-
classification-regression.html#naive-bayes
[23] “Multilayer Perceptron Classifier “,
https://pathmind.com/wiki/multilayer-perceptron
679
A Technique for Extracting User Specified
Information from Streaming Data
Bannya Chanda
Dept of Systems and Computer
Engineering
Carleton University
Ottawa, Canada
bannya.chanda@carleton.ca
Shikharesh Majumdar
Dept of Systems and Computer
Engineering
Carleton University
Ottawa, Canada
majumdar@sce.carleton.ca
Abstract— Users are often interested in a specific type of
data (user-preferred data) from large volumes of data streaming
into the system in real-time. An efficient system that only stores
user-preferred data from these data can reduce storage space by
storing only the user-preferred data and discarding the
remaining data. It also reduces the search latency which allows
the users to search for relevant information in a timely manner.
The motivation behind this research is to devise a technique that
filters streaming data and stores only the filtered data for post-
processing, thereby saving storage space and reduce search
latency. A proof-of-concept prototype for this technique has
been built on Apache Spark by leveraging the parallel
processing framework for streaming data and its machine
learning library. The performance of the prototype subjected to
synthetically generated streaming data is analyzed. The analysis
of experimental results demonstrates the efficacy of this
technique and provides insights into system behavior and
performance.
Keywords—Apache Spark, parallel processing, text
classification, preference-based filtering, streaming data, system
performance
I. INTRODUCTION
The need for processing larger volumes of data is
increasing continuously. The problems associated with the
processing of such large data include the high cost of data
solutions, complex systems for managing data, keeping up
with the growth in data, data integration, and the constant
nature of changing data. While users can benefit from this
growth in data, they must also be aware of the challenges such
as collecting, storing, sharing, and securing these data as well
as producing and utilizing meaningful insights from them
[25]. Users are reliant on actions such as storing and finding
relevant information from data streaming in real-time such as
live recordings or tweets-all of which contain a significant
amount of text data. Often, a given user is interested in a
particular set of topics or keywords. Their queries may
include, for example, the names of persons and places, a list
of medicines, the dates of sporting events, temperatures,
products. The effectiveness of their actions is often dependent
on how accurately the data are being derived from the source
material. This brings the challenge of storing relevant
information required by the user easily from streaming data.
The focus of this paper is to devise a technique that can filter
the data streaming in real-time to find the data as per the user’s
preferences and store this filtered data. After filtering the user-
preferred data, the user will be able to search through only this
filtered data set that is significantly smaller in volume in
comparison to the entire data. This technique is expected to
reduce the search latency as well as the volume of data that
needs to be stored for a specific user.
A parallel processing-based technique for filtering and
storing a user’s preferred information from stream data and
searching within the filtered data are described in this paper.
A proof-of-concept prototype of the technique is built on
Apache Spark which is a second-generation batch processing
engine with stream processing capabilities deployed on the
cloud. Spark avoids expensive intermediate disk operations,
by writing the intermediate data in memory and performing
in-memory computations to increase the performance of the
system. Spark’s structured streaming data processing library,
machine learning library, and high-level APIs for Python
make Spark appropriate for researching this technique. Using
Spark’s structured streaming data library, machine learning
library, and the Python libraries in Spark’s parallel processing
engine, this technique filters the data based on user
preferences in real-time and stores the filtered data to make
them available to the user in parallel. The streaming data
arrives from a streaming source. Apache Kafka, a distributed
streaming data platform has been used for interfacing with the
streaming source. The filtered data contain only the user-
preferred information from which the user can search for
relevant information such as text containing specific names,
dates, locations, and products. A little work exists on the topic
of addressing the use of structured streaming data processing
and machine learning for real-time filtering of user-preferred
data from streaming data that this paper focuses on. Some
preliminary high-level concepts to filter the large dataset
stored in a local directory using this technique are presented
in a short paper [3]. This previous work focuses on data at rest
(stored data). This research paper describes our new research:
algorithms and implementation details of the Filter prototype
to handle streaming data, and a detailed performance analysis
based on measurement of the Filter and Search methods in the
prototype by using a synthetic workload that allowed varying
the workload parameters enabling the answering of “what if”
questions. The primary contributions of this paper include the
following:
A parallel technique for filtering streaming data based
on the user’s preference and storing the filtered data
for future use.
o both an algorithm and a proof concept prototype
are described.
A prototype for parallel searching in the filtered data
based on the user’s queries.
Insights into system behavior and performance based
on a performance analysis of the prototypes through
measurements made on their deployments on a cloud.
The rest of this paper is organized as follows: Section II
presents the background on several tools and concepts used in
this paper. Section III presents the literature review. The
proposed technique and implementation of a proof-of-concept
prototype for the technique are briefly explained in Section
IV. Section V describes the experiments performed on the
Filter method prototype and the experimental results. Section
VI describes the performance evaluation of the Search
method. Section VII concludes the paper.
II. BACKGROUND
A. Apache Spark
Apache Spark is a fast, scalable, and reliable parallel
processing framework for large datasets with stream
processing capability [1]. An introduction to the architecture
of Spark is provided first. Its stream processing capability that
this research uses is explained after that. Spark focuses on
speeding up the application by offering full in-memory
computation, low latency, and high-level APIs and tools.
Spark combines clusters of machines with a model for writing
programs, which allows the logic of data transformations and
machine learning algorithms to be written in a way that is
parallelizable [1]. Apache Spark uses a master/slave
architecture containing one master node and multiple worker
(slaves) nodes [1]. The standalone cluster manager is the
default built-in resource manager of Spark [1] that has been
used for this research. With its default standalone cluster
manager, an application can get all the cores available in the
cluster by default [1].
A central coordinator of the Apache Spark architecture
called a driver runs in the master node [1]. A Spark
application is launched in the master node. Spark driver is the
program that calls the main program of an application and is
responsible for creating a Spark context [1]. The driver
program translates the application into actual Spark jobs,
which are further split into multiple smaller tasks [1]. Worker
nodes are responsible for executing the tasks assigned by the
cluster manager. A set of processes called executors reside
inside each worker node. An executor executes these tasks on
the partitioned RDD, performs operations, and returns the
result to the Spark context. Executors are launched once at
the beginning of a Spark application and then remain active
for the duration of the application. This is referred to as static
allocation. However, if the Spark application is configured to
execute dynamically, executors can be added or removed by
the cluster manager dynamically to match the overall
workload. The number of concurrent tasks an executor can
run is defined by the number of executor cores. With the
increase in the number of worker nodes or executor cores of
worker nodes, more RDD partitions can be executed in
parallel, which will make the application faster.
The Spark core architecture includes several libraries
such as Spark Streaming, and Spark MLlib [1]. Spark
Streaming processes the stream data in real-time. Apache
Spark’s MLlib comprises common machine learning
algorithms including classification algorithms [1]. These
algorithms are implemented as Spark operations performed
on RDDs. Among them, five different types of classification
algorithms: Multinomial Naïve Bayes, Multinomial Logistic
Regression, Multilayer Perceptron, Decision Tree, and
Random Forest Classifier of Spark MLlib [1] have been
trained with the text classification dataset [4][5] in this paper.
Among them, multinomial logistic regression is chosen to be
used in the data filtering technique as it demonstrates the
highest accuracy. A Multinomial Logistic Regression [18] is
a supervised classification algorithm that is used when the
dependent variable is categorical with more than two classes
(labels). It predicts the outcome by explaining the
relationship between the class and the features extracted from
the input data. Apache Spark MLlib APIs, Pipeline, and K-
folds Cross-Validation are used in this research. The pipeline
contains a sequence of algorithms to be run in a specific order
to process data. K-folds Cross-Validation is a re-sampling
method used to evaluate the machine learning model if the
input data size is small [20]. It fits and validates the machine
learning model by splitting the training dataset into K folds
which are used as separate training and test datasets. It runs
the machine learning algorithms with different combinations
of parameters and training and testing datasets to find the best
model.
Spark Structured Streaming is a stream-processing engine
with fast, fault-tolerant, and scalable stream processing built
on the Spark SQL engine [26]. It has been built by combining
batch and streaming computation to simplify the
development of continuous and real-time applications. Spark
Structured Streaming processes a data stream as a series of
small batches using a micro-batch processing engine, which
allows it to achieve low end-to-end latencies [26]. The micro-
batch processing engine groups the live data into small
batches. The key idea in Spark Structured Streaming is to
handle a live data stream as a table. Every data item on a small
batch that is arriving as a stream is appended to the input table
as a row [26]. Spark uses the already existing Dataset and
DataFrame data structure for the input table [26]. A
DataFrame is a distributed collection of data organized into
columns with columns’ names and type information. It is
conceptually equivalent to a relational database [22]. A
Dataset is an extension of DataFrame, which is also a
distributed collection of a data structure with a collection of
strongly typed objects that are mapped to the relational
schema [22]. Spark is connected to stream data sources via
middleware such as Kafka (described in the next sub-section)
to create the DataFrames/Datasets. Depending on a user-
defined trigger interval, the input table is being appended
with new data items arriving on the stream on which queries
are run and results from the queries are saved on the result
table [26].
B. Apache Kafka
Apache Kafka is an open-source distributed data
streaming platform used by many companies to publish,
subscribe, store, and process streams of records in real-time
[2]. Kafka acts as a middle layer that is built into streaming
data pipelines and allows a user to send messages between
systems and/or applications in distributed systems; it is also
built into the systems and applications that consume that data
[2]. Thus, the main idea is that the sender, that is known as a
producer, sends messages to the Kafka server, and the
receiver, that is known as a consumer, receives only messages
that the receiver is interested in (as specified through Kafka
topics) from the Kafka server. A Kafka topic is a category
name that is unique across the Kafka cluster. Each producer
publishes their data to the respective topics, while the
consumers read messages from their subscribed topic [2].
Hence, Kafka stores and organizes data across its topic.
Inside the Kafka cluster, topics are separated into individual
partitions, and partitions are then replicated among the
brokers [2]. This allows partitions to parallelize a topic across
multiple Kafka brokers or Spark clusters.
C. spaCy
spaCy is a pre-trained free, open-source python library
that is used for an information extraction technique that
automatically identifies named entities in a text and classifies
them into predefined categories [17]. This information
extraction is called named entity recognition (NER). Named
entities are defined as objects or quantities of interest such as
a person, organization, and location names, dates, times,
percentages, and money.
III. RELATED WORK
This paper focuses on a user preference-based filtering
technique. Various researchers have published works on the
text classification process using machine learning algorithms
in general and the machine learning library and structured
streaming library of Apache Spark in particular. A scalable
real-time health status prediction system in Apache Spark is
presented in [6]. The proposed technique receives user health
data through tweet streams and predicts their health status by
applying a Decision Tree model to the data. A Spark-based
sentiment analysis technique that operates on streaming
social network data to provide fast and most accurate results
is described in [10]. The proposed sentiment analysis
technique using parallelized Naïve Bayes provides both faster
and more accurate results compared to the sequential Naïve
Bayes technique and SVM technique [10]. Four MLlibs of
Apache Spark have been evaluated to provide an effective
solution for the prediction of heart diseases in the research
presented in [8]. The authors in [9] developed five models
using distributed machine learning based on Apache Spark
for predicting diabetes. Among those models, the Logistic
Regression Classifier achieved the highest accuracy. The
authors in [7] implemented collaborative filtering (CF)
recommender system on Apache Spark that can draw on
users’ past opinions for the process of prediction generation.
The authors in [12] proposed a personalized advertisement
recommendation system that extracts the user's preferences
using the Latent Dirichlet Allocation (LDA) model and
generates a recommended list of advertisements based on
user behavior [12]. A MapReduce-based vector space to
create a user profile based on the news items read by the user
is described in [11]. The authors define user profiling as a
process of learning and analyzing the demands of a user from
the data linked with that user. In the research presented in
[13], the authors used the MapReduce-based Rocchio
relevance feedback algorithm for document classification.
The experiments showed that the proposed solution based on
the Rocchoi algorithm with MapReduce technology
improved the speed of processing massive amounts of data
and guaranteed better accuracy [13]. In the research presented
in [14], the authors used different machine learning
techniques to classify research paper abstracts from the fields
of science, business, and social science. Although the existing
literature described in this section provides effective
techniques for text classification, recommendation system,
sentiment analysis of streaming social network data, and
health data prediction through tweet streams, none of these
works addresses the problem of filtering user-specified
information from streaming data that this paper focuses on.
IV. PROPOSED TECHNIQUE
The proposed technique is explained with the help of fig.
1. Each component of the system presented in the figure is
described next.
Raw Dataset: The raw dataset is a collection of
records that come from the Kafka stream source. Each
record is comprised of a sentence. For instance, the
raw dataset can be text sentences converted from an
audio recording of a live meeting. The filtering system
processes the raw text dataset and filters the processed
data based on user preferences. A more detailed
description is captured in fig. 2.
User Preferences: User preferences consist of a set of
keywords, each of which describes a topic the user is
interested in e.g., dates, locations, people’s names, or
products. The filtering technique uses the user
preferences to filter raw data and store only the
preferred data for handling user queries.
Filter: The “Filter” module filters the raw dataset
using the user-provided preferences and stores the
related user-preferred data. This module is referred to
as the “Filter method” in this research. The Filter
method is further discussed in Section IV-A.
Filtered Data: The filtered data consist of the data
related to user-specified keywords. They are stored in
a file after running the filtering method on the raw
dataset. The filtered data are the output of the Filter
method. The format of filtered data is discussed in
Section IV-A.
User Search Queries: A user can search within the
stored filtered data to get related data by providing a
keyword or a sentence. A method referred to as the
“Search method” is used in this research to handle
user search queries. The Search method is discussed
in Section IV-B.
Application: This an optional component that can
coexist with the component that processes user
queries. This is a program that processes the filtered
data for a specific intent. For example, an application
can be developed to generate appointment schedules
for the user based on the filtered data related to dates
and times.
A. Filter Method
Fig. 2 illustrates the components of the Filter method. The
Filter method reads the batches of records from the Kafka
topic and distributes them equally among executor cores. Each
executor stores the data in memory and performs operations
on the data. In the “Text preprocessing” function, each
executor core preprocesses the data by eliminating
punctuations and lower casing the capital letters. Then, the
preprocessed texts are classified by the named entity
recognition function and the machine learning (ML) model.
The named entity recognition (NER) function extracts specific
entities such as person names, locations, dates, and numbers
(described in Section II-C). The python library “spaCy” is
used to build the NER function. A multinomial logistic
regression classifier (discussed in Section II-A) has been
trained using the dataset obtained from the Cognitive
Computation Group of the University of Illinois [4][5].
Fig. 1. Proposed technique
Logistic Regression is used in the Filter method because it was
observed to outperform a number of other algorithms
experimented with [15]. This classifier is used to classify the
text according to its content. The classifier runs a sequence of
pipeline stages (see fig. 2): tokenization and stop words
removal, converting each word into feature vectors, encode
labels to label indices, and learn a Logistic Regression model
from feature vectors and labels. To evaluate the Logistic
Regression model, these previously discussed pipeline stages
have been run with different combinations of Logistic
Regression parameters using Spark’s 5-fold Cross-Validation
(discussed in Section II-A). The parameter combination that
gives rise to the best result is used by the Filter method to
classify the text data. The classes of the classified text data are
compared with the user preferences and matched texts are
stored as filtered data (see fig. 2). A filtered data is a comma-
separated file with the filtered text, entity classes recognized
by the named entity recognition function, extracted name
entities, and class predicted by the machine learning model.
The ML model and the NER function classify the text based
on classes specified in the training dataset [4][5]. Classes
include Person, Date, Time, Number, Product, Location, and
Money. For example, consider, a given batch of records
containing the following sentences. The preference provided
by the user is “Date”.
Record 1: “An appointment has been scheduled with Dr.
Jessica Smith on June 20, 2021, at 10:30 am.”
Record 2: “The hospital is located at 243 Bank St.”
After receiving these records from a Kafka topic, the Filter
method processes these texts and passes the texts as input to
the NER function and the ML model. NER extracts the
person’s name - “Jessica Smith”, date - “June 20, 2021” and
time - “10:30 am” from Record 1 and classifies Record 1 as
“Person”, “Date”, and “Time” (classes). The ML model
classifies Record 1 as “Person”, and “Date”. In the case of
Record 2, both the ML model and the NER function classify
it as “Location”. Next, the Filter method compares these
classes of each sentence with the user preferences. As the user
preference is “Date”, Record 1 is stored by the Filter method
as filtered data. So, the filtered data (shown in fig. 1) will
contain the following comma-separated sentence,
Record 1: “An appointment has been scheduled with Dr.
Jessica Smith on June 25, 2021, at 10:30 am.”, “Person, Date,
Time”, “Jessica Smith, June 20, 2021, 10:30 am”, “Person”.
Note that since the raw data comprise multiple records
only the records that correspond to the user preferences
(Record 1 in the previous example) are stored and the other
records are discarded. This can lead to a large reduction in data
that gets stored (see Section VI). This stored data is used by
the Search method to retrieve the user-specified data based on
user queries. A proof-of-concept prototype of the Filter
method is implemented on an Apache Spark cluster using the
PySpark API. More details of the Filter method are available
in the thesis [15].
1) Algorithm: The algorithm for the Filter method is
described by pseudocode 1a and 1b. Pseudocode 1a
corresponds to the receiving of the raw data where as
pseudocode 1b provides the details of the filtering operations.
The Filter method takes user preferences as multiple comma-
separated classes as input and outputs the filtered data. Line
1 calls the “getUserPrefHistory” function to get all the user’s
previous preferences that have been generated from the
history of previous queries sent by the user. This function
reads all the previous preferences, if any, from the user’s
preference history file and adds those preferences to the input
array of the preferences. In line 2 of the pseudocode, reads
the batches of records from each partition of the Kafka topic
(described in Section II-B) using a Spark function that creates
Dataset (described in Section II-A) and distributes records
among the executor cores equally. Line 3 calls a function
“process_stream” to process each batch. In lines 4-10, the text
data from a given batch are processed using the “map”
function. At first, the text data are divided into a list of
sentences (line 6), and each sentence is preprocessed by
lowercasing the capital letters and eliminating the
punctuation in line 7. In line 8, the “NER” function is applied
in each sentence to extract the named entities. The output
from this function is used in line 9 to create row instances,
where in each row there are three columns: “Text”,
“NERTokens”, and “NERData” for the sentence, named
entities classes, and the extracted names entities,
respectively. Line 10 creates DataFrame with the three
columns. This DataFrame is passed as an input to the stages
of the ML model to classify the sentences according to their
content in line 11. The model returns each row with another
additional column “label” for the predicted class. Lines 13-
16 iterate through each row and compare the predicted classes
with user preferences using the function
“compareClassesWithPref”. If the predicted classes of a
specific row match with the user preferences, the row is
stored in the filtered data using the “StoreFilteredData”
function in line 16.
B. Search Method
The Search method searches data within the filtered data
concurrently based on a linear search algorithm described in
[24]. Similar to this linear search algorithm, the search
Fig. 2. Prototype of Filter method
method searches each sentence of the filtered data until it
finds the sentence that matches the argument specified in the
user’s query. The search method equally distributes the
filtered data among the executor cores and then, performs the
search in an executor core. This method receives the user
query in two ways: by keywords and by a sentence. A proof-
of-concept prototype of the Search method is implemented on
an Apache Spark cluster using the PySpark API. Components
of the proof-of-concept prototype of the Search method are
shown in fig. 3.
1) User query by keywords: In the method shown in fig.
3a, the user provides keywords to the Search method as input
to get the results related to those keywords from the filtered
data. First, the Search method processes the comma-
separated lines of filtered data in the “Filtered Data
Processing” function. The Search method looks for the
keywords in the sentences returned by this “Filtered Data
Processing” function. Then, the Search method displays the
relevant search results to the user.
2) User query by a sentence: In the second approach (see
fig 3b), the user provides a sentence to the Search method as
input. First, the sentence provided by the user is preprocessed
by the “Query Cleaning” function, which eliminates
punctuation and converts the capital letters into lower case
letters. Then, the processed sentence is classified by the ML
model, and the named entities are extracted from the sentence
by the NER function (see fig. 3b). The classes of the sentence
are saved in a file as user preferences. The same ML model
and the NER function used in the Filter method (see Section
IV-A) are used here as well. As in the case of the first method
(fig. 3a), the “Filtered Data Processing” function is used to
process the comma-separated lines of filtered data. In this
method, this “Filtered Data Processing” function returns a list
of sentences and their corresponding classes and named
entities from the filtered data. Then, the classes and named
entities of the user-provided sentence are compared with the
classes and named entities of each row of the list returned by
this function. The classes associated with that user-provided
sentence are saved as user preferences in a user preference
history file. User preferences can be generated in two ways:
either by the Search method used on earlier occasions or by
direct specification from the user. The Filter method adds the
preferences from this history file in the user-provided
preferences list (Pseudo Code 1a line 1), and this preferences
list is used to filter the raw dataset.
Further details in terms of pseudocode could not be
provided due to space limitations. These are available in the
thesis [15].
V. PERFORMANCE EVALUATION OF THE FILTER METHOD
A set of experiments is performed on the system prototype
to evaluate the performance of the Filter method deployed on
an Apache Spark cluster. The experimental analysis considers
doctor-patient conversations as the raw dataset. A producer
application is sending stream data in small batches to
partitions in a Kafka topic (described in Section II-B). Each
batch has a fixed number of records (sentences), and after
sending a batch of records the producer waits for a predefined
fixed amount of time before sending the next batch. The Filter
method in Spark Cluster reads the records from Kafka topic
partitions and then filters the records. The producer sends
batches of synthetically generated data to the partitions in a
Kafka topic, and then the Filter method processes these
batches. The details of the data producer, system
configuration, and workload parameters are discussed in the
next sections.
A. System Configuration
The Apache Spark cluster deploying Spark-version 3.0.1
comprised of one master node and three worker nodes is set
up on an Amazon EC2 cloud infrastructure in the ca-central-1
region [23]. A c5a.4xlarge-type EC2 instance is used for
running the master node and one worker node. This EC2
instance type is comprised of 16 cores and 32 GB RAM
running the Ubuntu 20.04 LTS operating system. The driver
memory is set to 12 GB. This paper does not focus on
scheduling that is why the scheduling policy is set to FIFO
(first-in-first-out) which is the default scheduling policy of
Spark. Resource allocation is set to static in the cluster. For
the experiments on the Filter method, each worker node is
running on 8 cores. The Kafka cluster is set up on the same
c5a.4xlarge-type EC2 instance where the master node and one
worker node reside. The producer application is run on a
t2.micro-type EC2 instance equipped with 1 GB of RAM, one
CPU core, and a clock speed of 2.5 GHz running on the
Ubuntu 20.04 LTS operating system.
B. Data Producer
A producer application sends batches of 80 synthetically
generated raw data to the topic of Apache Kafka. To send data
into partitions of a Kafka topic, the producer API of “kafka-
python”, a Python client for the Apache Kafka distributed
stream processing system [16], has been used in this paper. A
Kafka topic is created with multiple partitions. A producer
application is created using the Python programming
language, which pushes records to the Kafka topic using the
producer API [16]. The producer sends a stream of records
(sentences) that gets partitioned into record sets that get sent
to a Kafka topic where they are processed in parallel. Then,
the Filter method in Apache Spark filters the records in
parallel. The raw dataset source from where the producer
application reads records is discussed in Section V-C. For a
given number of batches, the producer application divides the
total number of records in this batch by the number of
partitions in the topic and sends an equal number of records
to each partition. The number of partitions in the topic is set
(a)
(b)
Fig. 3. Prototype of user query to filtered data: a) search by keywords
b) search by a sentence
to be equal to the number of executor cores of the Apache
Spark cluster for all the experiments.
C. Raw Dataset
Experiments were performed on synthetically generated
stream data. A raw dataset was generated synthetically by
using several sample medical conversations between doctors
and patients that were collected from two websites [19][21].
Each set of conversations is comprised of over 200 words.
The raw dataset is obtained by replicating the content of the
conversations. The conversations were replicated 800 times
in a file of the local directory. The resulting total size of the
raw dataset is 10 MB. The producer application reads the
sentences from this raw dataset file to create batches of
records and sends the batches to the Kafka topic.
D. Workload Parameters
To evaluate the performance of the Filter method, several
experiments were run on the Spark and Kafka Cluster by
changing the number of executor cores, length of batch
records, and batch intervals. A summary of these workload
and system parameters is displayed in Table I. The
experiments are performed by following a factor-at-a-time
approach where one of the parameters is changed while
others are held at their default values (indicated in bold in
Table I). Note that fixed values of workload parameters (e.g.,
the number of batches and Kafka topic) and system
parameters (e.g., driver memory and worker nodes memory)
are used in each experiment.
E. Performance Metrics
The performance of the Filter method is evaluated using
two performance metrics that are described next.
1) Average Batch Processing Latency (TP): Average
batch processing latency is the average time in seconds a
batch takes to be processed by the Filter method on a given
number of resources on the Apache Spark cluster. To
compute TP, two timestamps are taken using time.time() from
Python Library: one after the Kafka producer finishes sending
a batch and one after the Filter method finishes processing the
batch from Kafka by generating the filtered data. The
difference between these two timestamps (in seconds) is used
to compute the batch processing latency. TP is then computed
by taking the average of all the computed batch processing
latencies.
2) Throughput (X): Throughput is the number of batch
records in bytes being processed per second. To compute X,
the total number of bytes completed is divided by the elapsed
time (in seconds) of the Filter method. The total bytes
completion is measured by adding the total bytes of each
batch. The elapsed time (TE) is the total time in seconds taken
by the Filter method to complete the processing of all the
batches. This is the time that the Filter method spends on the
processing of all the batches from the beginning to the end.
F. Effect of the Length of Batch in Records (BL)
The length of Batch is the number of records (sentences)
in each batch sent by the producer. Each record is a single
sentence. To see the effect of the length of batch (BL) on the
performance of the Filter method, an experiment was
conducted by changing BL. The number of executor cores in
a worker node was held at the default value of 8 and a default
batch interval (BI) value of 15 seconds is used. The BL is
varied from 200 to 2000 records in this experiment. Fig. 4
shows the average batch processing latency (TP) achieved
with different lengths of batches in the records. Fig. 4 shows
that TP increases with an increase in BL. The Filter method
requires more time to process the increased BL. Fig. 5 shows
that the throughput (X) increases with an increase in BL.
Since BI is fixed to a default value of 15 seconds, X increases
as the total number of bytes in records increases.
G. Effect of Batch Interval (BI)
The time difference (in seconds) between two continuous
batches sent by a producer is referred to as the batch interval
(BI). To see the effect of the batch interval (BI) on the Filter
method, the experiment was conducted by changing the value
of BI from 15 to 30 seconds. The number of executor cores is
set to the default value in a worker node and the default value
of batch records (BL) of 1000 is used. Fig. 6 shows the TP
achieved with different BI. Increasing the value of BI has no
effect on the TP. The reason for this is that BL varies, the batch
length remains the same. Thus, the time required by the Filter
method to process each batch remains the same. TP is not
impacted by a higher value of BI. The higher BI only provides
a longer time window for the Filter method to complete
processing of the current batch before the next batch arrives.
Fig. 7 displays the throughput (X) achieved with a different
BI. X is observed to decrease with an increase in the BI. This
is because although the BL is fixed, the total time taken by the
Filter method to complete the processing of 80 batches
increases with a higher value of BI. Thus, the throughput
changes with a higher value of BI.
TABLE I. WORKLOAD PARAMETERS
Workload Parameters Value
Length of Batch in
Records (BL)
{200, 500, 1000, 2000}
records
Batch Interval (BI) {15, 20, 30} seconds
Number of Batches (NB) 80
Kafka Topic 1
System Memory/Worker Nodes 16 GB
Driver memory 8 GB
Number of Worker Nodes
(NW)
{1}
Number of Executor Cores
(N)
{1, 2, 4, 6, 8, 12}
Fig. 4. Average batch processing latency for the Filter method vs the
length of batch in records
Fig. 5. Throughput for the Filter method vs the length of batch in
records
H. Effect of the Number of Executor Cores (N)
To see the effect of the number of processing resources
on the Filter method, an experiment was performed by
changing the number of executor cores (N) from 1 to 12 in a
worker node for a default batch interval (BI) and a default
length of batch in records (BL). Fig. 8 displays the average
batch processing latency (TP) achieved with different values
of N. A low TP was achieved with an increase N. For
example, when N is 1, TP is 12.793 seconds, whereas TP
decreases to 6.524 seconds when N is 8. This is because with
an increasing N in a worker node more batch records execute
in parallel. Fig. 9 shows that increasing N does not have any
impact on the system throughput (X). X is observed to be the
same with an increase in N. For the default BL value of 1000
records, the total number of bytes remains the same. As a
result, for an increasing number of cores, a given batch
processes quickly, but the total time (TE) required by the
Filter method to process all batches does not change for a
given number of executor cores.
VI. PERFORMANCE EVALUATION OF THE SEARCH
METHOD
The performance of the Search method that searches
within the filtered data is compared with the performance of
the Search method that searches within the non-filtered data
for assessing the performance improvement that accrues from
selective filtering. In this research, experiments were
performed by performing searches in two ways: sequential
and parallel. Four different experiments were conducted to
evaluate the performance of the Search method: both
searching based on keywords and a sentence are investigated.
In the first experiment, a sequential search based on keywords
is performed for the filtered and non-filtered data. In the
second experiment, the Search method searches within the
filtered and non-filtered data based on keywords in parallel.
In the third experiment, a sequential search based on a
sentence runs within the filtered and non-filtered data. In the
fourth experiment, a parallel search based on a sentence is
performed within the filtered and non-filtered data. For the
non-filtered raw dataset, a 4.1 GB raw dataset was used. The
filtered data were retrieved using the Filter method by
filtering the 4.1 GB non-filtered raw dataset based on the user
preferences: “DATE”, “LOCATION” and “PERSON”. In the
sequential Search method, the number of worker nodes is set
to 1 with 1 executor core. In parallel executions, the number
of worker nodes is set to 2 with 2 executor cores each. This
is because the Search method searches within a smaller
volume of data (filtered data) and thus, does not require that
many resources as used in the case of the Filter method.
The computation time (TS) achieved by a search within
the non-filtered data and TS achieved by a search within the
filtered data are used to measure the Filter method’s
efficiency (EF). EF is the ratio between TS achieved with non-
filtered data and TS achieved with filtered data. A higher
efficiency ratio indicates the degree of reduction in TS
achieved by a search with the filtered data from that achieved
by a search with the non-filtered data. Table II shows EF
achieved for sequential and parallel search.
Sequential search: EF achieved with the search by
keywords is 105, which means that TS for the search
within non-filtered data is 105 times that of TS for the
search within filtered data. EF is 57.6 when the search is
performed by a sentence; thus, TS for the search within
non-filtered data is 57.6 times that of TS for the search
within filtered data.
Parallel search: EF is 63.6 when the search is performed
by keywords; thus, TS for the search within non-filtered
data is 63.6 times that of TS for the search within filtered
data. However, TS for the search within non-filtered data
is 29 times that of TS for the search within filtered data
when the search is performed by a sentence.
VII. CONCLUSIONS AND FUTURE WORK
To accurately filter the information a user is interested in
from real-time streaming data is a challenging issue. This
paper addresses this problem by proposing a Filter and Search
method that filters the data streaming in real-time based on
user preferences and speeds up the subsequent searching
operations on the stored filtered data. The proposed filtering
technique leverages the parallel processing platform as well
as the machine learning library of Spark for achieving its
goals. The experimental results show the viability of the
technique while demonstrating the reduction of filtering
latency and speedup achieved by the filtering technique due
Fig. 6. Average batch processing latency for the Filter method vs the
batch interval
Fig. 7. Throughput vs the batch interval
Fig. 8. Average batch processing latency for the Filter method vs the
number of executor cores
Fig. 9. Throughput for the Filter method vs the number of executor
cores
to parallel executions. Key insights resulting from the
performance analysis conducted for the prototype and the
data sets experimented with include the following
Effect of parallelism in execution: TP decreases with
an increase in N. This is because more records of a
given batch can be processed concurrently by the
Filter method with a higher value of N. However, an
increase in N does not have any impact on
throughput. As BL and BI are fixed, X remains the
same for different values of N.
Effect of the Length of Batch in Records (BL): An
increase in BL increases the average processing time
of each batch. A higher value of BL indicates that a
greater number of records (sentences) in a batch
needs to be processed by the Filter method. Both TP
and X increase with an increase in BL.
Effect of Batch Interval (BI): An increase in BI does
not seem to have any impact on TP achieved by the
Filter method. But X is observed to decrease with a
higher value of BI because the elapsed time
((defined in Section V-E) increases with a higher
value of BI.
Filtering efficiency (EF): The high EF achieved with
the data experimented with demonstrates the high
filtering efficiency both in the case of sequential and
parallel search. A decrease in TS by orders of
magnitude is observed to be achieved by searching
within the filtered data in comparison to searching
within non-filtered data.
Directions for future research include the following:
Experimenting with more synthetic and real-world
datasets forms an interesting direction for future
research.
Converting live speech to text and filtering the text
based on user preferences using the Filter method
warrants investigation.
ACKNOWLEDGMENTS
We are thankful to the Natural Sciences and Engineering
Research Council of Canada (NSERC) for providing
financial support for this research.
REFERENCES
[1] Apache Spark, “Apache Spark”, [Online]. Available at
https://spark.apache.org/. [Accessed: 01-Apr-2021].
[2] Apache Kafka, “Apache Kafka”, [Online]. Available at
https://kafka.apache.org/. [Accessed: 20-Feb-2021].
[3] B. Chanda and S. Majumdar, “Filtering and Storing User Preferred
Data: An Apache Spark Based Approach,” in Proceedings of Cloud
and Big Data Computing, 2020, pp. 679-685.
[4] X. Li and D. Roth, “Learning Question Classifiers,” 19th International
Conference on Computational Linguistics, 2002, pp. 556-562.
[5] X. Li and D. Roth, “Learning Question Classifiers: The Role of
Semantic Information,” Journal of Natural Language Engineering,
vol. 12, no. 3, pp. 229-249, 2005.
[6] A. Ed-daoudy and K. Maalmi, "Application of Machine Learning
Model on Streaming Health Data Event in Real-Time to Predict Health
Status Using Spark," International Symposium on Advanced Electrical
and Communication Technologies (ISAECT) Conference, 2018, pp. 1-
4.
[7] A. Alexopoulos, G. Drakopoulos, A. Kanavos, S. Sioutas and G.
Vonitsanos, "Parametric Evaluation of Collaborative Filtering over
Apache Spark," 5th South-East Europe Design Automation, Computer
Engineering, Computer Networks and Social Media Conference
(SEEDA-CECNSM), 2020, pp. 1-8.
[8] A. Ed-Daoudy and K. Maalmi, "Performance Evaluation of Machine
Learning Based Big Data Processing Framework for Prediction of
Heart Disease," International Conference on Intelligent Systems and
Advanced Computing Sciences (ISACS), 2019, pp. 1-5.
[9] H. Ahmed, E. M. G. Younis and A. A. Ali, "Predicting Diabetes using
Distributed Machine Learning Based on Apache Spark," International
Conference on Innovative Trends in Communication and Computer
Engineering (ITCE), 2020, pp. 44-49.
[10] V. J. Nirmal and D. I. G. Amalarethinam, "Real-Time Sentiment
Prediction on Streaming Social Network Data Using In-Memory
Processing," World Congress on Computing and Communication
Technologies (WCCCT), 2017, pp. 69-72.
[11] A. Gautam and P. Bedi, "MR-VSM: Map Reduce Based Vector Space
Model for User Profiling-an Empirical Study on News Data,"
International Conference on Advances in Computing,
Communications, and Informatics (ICACCI), 2015, pp. 355-360.
[12] X. Liu and Y. Zhang, "A Kind of Personalized Advertising
Recommendation Method Based on User-Interest-Behavior Model,"
8th International Symposium on Next Generation Electronics (ISNE)
Conference, 2019, pp. 1-4.
[13] W. Yang, Y. Fu, and D. Zhang, "An Improved Parallel Algorithm for
Text Categorization," International Symposium on Computer,
Consumer and Control (IS3C), 2016, pp. 451-454.
[14] S. Chowdhury and M. P. Schoen, "Research Paper Classification using
Supervised Machine Learning Techniques," Intermountain
Engineering, Technology and Computing (IETC) Conference, 2020,
pp. 1-6.
[15] B. Chanda, “A Parallel Processing Technique for Filtering and Storing
User Specified Data”, MASc Thesis, Department of Systems and
Computer Engineering, Carleton University, May 2021
[16] Kafka Python, “KafkaProducer”, [Online]. Available at https://kafka-
python.readthedocs.io/en/master/apidoc/KafkaProducer.html.
[Accessed: 20-Feb-2021].
[17] spaCy. “spaCy”. [Online]. Available at https://spacy.io/. [Accessed:
20-Feb-2020].
[18] Apache Spark, “Multinomial Logistic Regression”, [Online].
Available at https://spark.apache.org/docs/3.0.1/ml-classification-
regression.html#multinomial-logistic-regression. [Accessed: 10-Mar-
2021].
[19] Anil, “Conversation Between Doctor and Patient [Five Scenarios]”,
Lemon Grad. [Online]. Available at
https://lemongrad.com/conversation-between-doctor-and-patient/.
[Accessed: 01-Sept-2020].
[20] Machine learning mastery, “K-folds Cross-Validation”, [Online].
Available at https://machinelearningmastery.com/k-fold-cross-
validation/. [Accessed: 20-Mar-2021].
[21] Prasanna, “English Conversation Between Doctor and Patient in Four
Simple Scenarios”, A Plus Topper. [Online]. Available at
https://www.aplustopper.com/conversation-between-doctor-and-
patient/. [Accessed: 01-Sept-2020].
[22] Apache Spark, “Spark SQL, DataFrames and Datasets Guide”,
[Online]. Available at https://spark.apache.org/docs/3.0.1/sql-
programming-guide.html. [Accessed: 02-Mar-2021].
[23] Amazon, “Amazon Web Services”, [Online]. Available at
https://aws.amazon.com/ec2/. [Accessed: 05-Mar-2020].
[24] Programiz, “Linear Search”, [Online]. Available at
https://www.programiz.com/dsa/linear-search. [Accessed: 03-Mar-
2021].
[25] Solvexia, “15 Big Data Problems You Need to Solve”, [Online].
Available at https://www.solvexia.com/blog/15-big-data-problems-
you-need-to-solve. [Accessed: 20-Mar-2021].
[26] Apache Spark, “Structured Streaming Programming Guide”, [Online].
Available at http://spark.apache.org/docs/3.0.1/structured-streaming-
programming-guide.html. [Accessed: 02-Mar-2021].
TABLE II. FILTERING EFFICIENCY (EF)
Search method Search by EF
Sequential Keywords 105
Sequential Sentences 57.6
Parallel Keywords 63.6
Parallel Sentences 29