MAST30034 Applied Data Science week 2
PySpark Basic Operations
Selection
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("CPU ADS wk2")
.config("spark.sql.repl.eagerEval.enabled", True)
.config("spark.sql.parquet.cacheMetadata", "true")
.config("spark.sql.session.timeZone", "Etc/UTC")
# 如果后续发现数据太大内存不够,改这里
.config("spark.driver.memory", "2g")
.config("spark.executer.memory", "4g")
.getOrCreate()
)
23/07/12 22:55:27 WARN Utils: Your hostname, Luo resolves to a loopback address: 127.0.1.1; us
ing 172.17.199.87 instead (on interface eth0)
23/07/12 22:55:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/12 22:55:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platfor
m... using builtin-java classes where applicable
sdf = spark.read.parquet('data/2019-01.parquet')
sdf.limit(5)
VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance RatecodeID s
1 2019-01-01 00:46:40 2019-01-01 00:53:20 1.0 1.5 1.0
1 2019-01-01 00:59:47 2019-01-01 01:18:59 1.0 2.6 1.0
2 2018-12-21 13:48:30 2018-12-21 13:52:40 3.0 0.0 1.0
2 2018-11-28 15:52:25 2018-11-28 15:55:45 5.0 0.0 1.0
2 2018-11-28 15:56:57 2018-11-28 15:58:33 5.0 0.0 2.0
Filtering
from pyspark.sql import functions as F
GroupBy
In [ ]:
In [ ]:
Out[ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
Saving Data
Sampling Data
SAMPLE_SIZE = 0.01
Lazy Evaluation
Rename and type conversion
Spark SQL
sdf.createOrReplaceTempView('taxi')
sql_query = spark.sql("""
SELECT
PULocationID,
COUNT(tpep_pickup_datetime) AS number_of_trips,
ROUND(AVG(trip_distance), 4) AS average_distance_miles,
ROUND(AVG(fare_amount), 4) AS average_fare_amount_usd
FROM
taxi
WHERE
passenger_count == 5
AND trip_distance > 0
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
GROUP BY
PULocationID
ORDER BY
average_fare_amount_usd DESC
""")
sql_query.limit(10)
GeoPandas
import pandas as pd
import geopandas as gpd
import folium
Related Parameters
tilers: Generate a base map of given width and height with either default tilesets or a custom tileset
URL. The following tilesets are built-in to Folium. Pass any of the following to the “tiles” keyword:
“OpenStreetMap”
“Stamen” (Terrain, Toner, and Watercolor)
“CartoDB” (positron and dark_matter)
fill colors: fill_color (string, optional) – Area fill color, defaults to blue. Can pass a hex code, color name,
or if you are binding data, one of the following color brewer palettes: ‘BuGn’, ‘BuPu’, ‘GnBu’, ‘OrRd’,
‘PuBu’, ‘PuBuGn’, ‘PuRd’, ‘RdPu’, ‘YlGn’, ‘YlGnBu’, ‘YlOrBr’, and ‘YlOrRd’.
nan_fill_color (string, default 'black') – Area fill color for nan or missing values. Can pass a hex code, color
name.
fill_opacity (float, default 0.6) – Area fill opacity, range 0-1.
nan_fill_opacity (float, default fill_opacity) – Area fill opacity for nan or missing values, range 0-1.
line_color (string, default 'black') – GeoJSON geopath line color.
line_weight (int, default 1) – GeoJSON geopath line weight.
line_opacity (float, default 1) – GeoJSON geopath line opacity, range 0-1.
Key Points for Visualization
your img should be self-explanable
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
don't be too complex
the img should tell a story with your brief explanation
scale and axis should be reasonable and easy to read
suitable type of plot for your purpose
generally look nice
Project 1 Pipeline