-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFileManager.py
More file actions
73 lines (58 loc) · 3.15 KB
/
Copy pathFileManager.py
File metadata and controls
73 lines (58 loc) · 3.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from pyspark.sql import SparkSession
from DatabaseManager import DatabaseManager
class FileManager:
def __init__(self):
self.spark_session = SparkSession.builder.appName("squirrel-data").getOrCreate()
self.db_manager = DatabaseManager()
def __load_csv(self, file_path):
# TODO: validate if file exists
data_frame = self.spark_session.read.csv(file_path, header=True, inferSchema=True)
return data_frame
def load_squirrels_data(self, file_path):
data_frame = self.__load_csv(file_path)
# Store Parks data
parks_values = data_frame.select("Park ID").distinct().collect()
parks = [{"id": row["Park ID"]} for row in parks_values]
print("- Parks data: ", parks)
self.db_manager.store_data("parks", "id", parks)
# Store Colors data
colors_values = data_frame.select("Primary Fur Color").distinct().collect()
colors = [{"name": row["Primary Fur Color"]} for row in colors_values if row["Primary Fur Color"]]
print("- Colors data: ", colors)
self.db_manager.store_data("colors", "id", colors)
# Store Squirrels data
squirrels_values = data_frame.select(["Squirrel ID", "Activities", "Park ID", "Primary Fur Color"]).distinct().collect()
colors_ids_dict = {row["name"]: row["id"] for row in self.db_manager.get_data("colors", get_distinct=True)}
squirrels = [{
"id": row["Squirrel ID"],
"activities": row["Activities"],
"park_id": row["Park ID"],
"primary_fur_color_id": colors_ids_dict[row["Primary Fur Color"]] if row["Primary Fur Color"] else None
} for row in squirrels_values]
print("- Squirrels data: ", squirrels)
self.db_manager.store_data("squirrels", "id", squirrels, update_on_conflict=True)
print("=> Parks, Colors and Squirrels data was loaded and stored")
def load_parks_data(self, file_path):
data_frame = self.__load_csv(file_path)
# Store Areas data
areas_values = data_frame.select(["Area ID", "Area Name"]).distinct().collect()
areas = [{"id": row["Area ID"], "name": row["Area Name"]} for row in areas_values]
print("- Areas data: ", areas)
self.db_manager.store_data("areas", "id", areas, update_on_conflict=True)
# Store Parks data
parks_values = data_frame.select(["Park Name", "Park ID", "Other Animal Sightings", "Area ID"]).distinct().collect()
def format_id_number(value):
return str(int(value)) if value == int(value) else str(value)
parks = [{
"id": format_id_number(row["Park ID"]),
"name": row["Park Name"],
"area_id": row["Area ID"],
"other_animal_sightings": row["Other Animal Sightings"]
} for row in parks_values]
print("-Parks data: ", parks)
self.db_manager.store_data("parks", "id", parks, update_on_conflict=True)
print("=> Areas and Parks data was loaded and stored")
if __name__ == '__main__':
file_manager = FileManager()
file_manager.load_squirrels_data("./data/squirrel-data.csv")
file_manager.load_parks_data("./data/park-data.csv")