-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path2_threading_preprocess.py
More file actions
73 lines (58 loc) · 2.2 KB
/
2_threading_preprocess.py
File metadata and controls
73 lines (58 loc) · 2.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import datetime
import pandas as pd
import threading
def time_it(func):
def wrapper(*args, **kwargs):
start_time = datetime.datetime.now()
print(f"{func.__name__} started")
result = func(*args, **kwargs)
end_time = datetime.datetime.now()
print(f"{func.__name__} finished: {end_time - start_time}")
return result
return wrapper
@time_it
def read_csv(csv_file):
return pd.read_csv(csv_file)
@time_it
def clean_data(df):
return df[(df['passenger_count'] != 0) &
(df['trip_distance'] != 0) &
(df['fare_amount'] >= 0) &
(df['total_amount'] >= 0)].copy()
@time_it
def convert_dates(df):
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
return df
@time_it
def calculate_trip_duration(df):
df['trip_duration'] = (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime'])
return df
def process_chunk(df):
cleaned_taxi = clean_data(df)
cleaned_taxi = convert_dates(cleaned_taxi)
cleaned_taxi = calculate_trip_duration(cleaned_taxi)
return cleaned_taxi
if __name__ == "__main__":
# start_time = datetime.datetime.now()
source_csv = "/mnt/c/Users/natha/code/ParallelPython/data/yellow_tripdata_2015-01.csv"
taxi = read_csv(source_csv)
start_time = datetime.datetime.now()h
# Split the DataFrame into chunks (adjust num_chunks as needed)
num_chunks = 4
chunks = [taxi[i::num_chunks] for i in range(num_chunks)]
# Create and start threads
threads = []
results = []
for chunk in chunks:
thread = threading.Thread(target=lambda r, c: r.append(process_chunk(c)), args=(results, chunk))
threads.append(thread)
thread.start()
# Wait for all threads to finish
for thread in threads:
thread.join()
# Concatenate the results
cleaned_taxi = pd.concat(results)
# Further processing or saving the output
# cleaned_taxi.to_csv("processed_output.csv", index=False)
print(f"Processing finished. Time Elapsed: {datetime.datetime.now() - start_time}")