-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path3_spark_preprocess.py
More file actions
71 lines (52 loc) · 1.93 KB
/
3_spark_preprocess.py
File metadata and controls
71 lines (52 loc) · 1.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp
from pyspark.sql.types import TimestampType
import datetime
def time_it(func):
def wrapper(*args, **kwargs):
start_time = datetime.datetime.now()
print(f"{func.__name__} started")
result = func(*args, **kwargs)
end_time = datetime.datetime.now()
print(f"{func.__name__} finished: {end_time - start_time}")
return result
return wrapper
@time_it
def read_csv(spark, csv_file):
return spark.read.csv(csv_file, header=True, inferSchema=True)
@time_it
def clean_data(df):
return df.filter(
(col('passenger_count') != 0) &
(col('trip_distance') != 0) &
(col('fare_amount') >= 0) &
(col('total_amount') >= 0)
)
@time_it
def convert_dates(df):
return df.withColumn(
'tpep_pickup_datetime',
to_timestamp('tpep_pickup_datetime')
).withColumn(
'tpep_dropoff_datetime',
to_timestamp('tpep_dropoff_datetime')
)
@time_it
def calculate_trip_duration(df):
return df.withColumn(
'trip_duration',
col('tpep_dropoff_datetime').cast(TimestampType()) - col('tpep_pickup_datetime').cast(TimestampType())
)
if __name__ == "__main__":
start_time = datetime.datetime.now()
source_csv = "/mnt/c/Users/natha/code/ParallelPython/data/yellow_tripdata_2015-01.csv"
spark = SparkSession.builder.appName("TaxiPreprocessing").getOrCreate()
taxi = read_csv(spark, source_csv)
cleaned_taxi = clean_data(taxi)
cleaned_taxi = convert_dates(cleaned_taxi)
cleaned_taxi = calculate_trip_duration(cleaned_taxi)
cleaned_taxi.show(5)
# If you need to save the output
# cleaned_taxi.write.csv("output_path", header=True)
spark.stop()
print(f"Spark Processing finished. Time Elapsed: {datetime.datetime.now() - start_time}")