Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 111 additions & 9 deletions pfb_to_zip/pfb_to_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import re
import subprocess
import csv
import json
from shutil import rmtree, make_archive, copy, copytree

import requests
Expand All @@ -15,6 +16,7 @@
from pfb.reader import PFBReader
from pfb.exporters import tsv
from dictionaryutils.utils import node_values_to_codes
from pfb.writer import PFBWriter


def to_folder_name(value: str) -> str:
Expand All @@ -28,8 +30,6 @@ class Config():
def __init__(self):
self.reader = None



class PFBExporter:
def __init__(self, pfb_file_path:str, tmp_folder:str, output_path:str, config_file_path:str, ontology:str=None, extra_analysis:str=None) -> None:
self.pfb_file_path = pfb_file_path
Expand Down Expand Up @@ -77,7 +77,6 @@ def __new__(cls, *args, **kwargs):
if cls._validate(*args, **kwargs):
return super().__new__(cls)



def initialize(self):
print("Initializing...")
Expand Down Expand Up @@ -316,7 +315,6 @@ def zip(self):
self.zip_file_output_path = make_archive(output_filename, 'zip', self.tmp_folder, self.zip_subfolder)
print(f"ZIP file created at {self.zip_file_output_path}")


def clean_up(self):
print(f"Removing all the resources used and the temporary data directory at {self.tmp_folder}")

Expand All @@ -335,18 +333,121 @@ def clean_up(self):
print(e)


class Difference:
def __init__(self, old_file_path: str, new_file_path: str):
self.old_file = old_file_path
self.new_file = new_file_path

def read_avro(self, file_path):
records_by_subject = {}

with open(file_path, 'rb') as f:
with PFBReader(f) as reader:
for record in reader:
obj = record['object']
name = record['name']

if name == 'subject':
sid = obj.get('submitter_id')
else:
sid = obj.get('subjects.submitter_id')

if sid:
if sid not in records_by_subject:
records_by_subject[sid] = {}
if name not in records_by_subject[sid]:
records_by_subject[sid][name] = []
records_by_subject[sid][name].append(record)

return records_by_subject

# removes created and updated times from data for comparison
def _strip_timestamps(self, nodes):
stripped = {}
for node_name, records in nodes.items():
stripped[node_name] = []
for r in records:
stripped_record = r.copy()
obj = r['object'].copy()
obj.pop('created_datetime', None)
obj.pop('updated_datetime', None)
stripped_record['object'] = obj
stripped[node_name].append(stripped_record)
return stripped

def _normalize(self, nodes):
normalized = self._strip_timestamps(nodes)
for node_name in normalized:
normalized[node_name] = sorted(
normalized[node_name],
key=lambda r: json.dumps(r, sort_keys=True)
)
return normalized

def generate_diff(self, output_path: str):
old = self.read_avro(self.old_file)
new = self.read_avro(self.new_file)

deleted_sids = set(old.keys()) - set(new.keys())
diff_records = []
log_lines = []

if deleted_sids:
msg = f"{len(deleted_sids)} subject(s) removed:"
print(msg)
log_lines.append(msg)
for sid in sorted(deleted_sids):
line = f" - {sid}"
print(line)
log_lines.append(line)
Comment thread
sonali-shaw marked this conversation as resolved.
for records in old[sid].values():
for record in records:
line = f" - {record['name']}: {record['object']}"
print(line)
log_lines.append(line)
else:
msg = "No subjects removed."
print(msg)
log_lines.append(msg)

for sid, new_nodes in new.items():
if sid not in old:
for records in new_nodes.values():
diff_records.extend(records)
else:
if self._normalize(new_nodes) != self._normalize(old[sid]):
for records in new_nodes.values():
diff_records.extend(records)

summary = f"Total changed/new records in diff: {len(diff_records)}"
print(summary)
log_lines.append(summary)

log_path = output_path.replace('.avro', '_log.md')
with open(log_path, 'w') as f:
f.write("# Diff Log\n\n")
f.write("\n".join(log_lines))

with open(output_path, 'wb') as out_f:
with PFBWriter(out_f) as writer:
with open(self.new_file, 'rb') as schema_f:
with PFBReader(schema_f) as reader:
writer.copy_schema(reader)
writer.write(diff_records)

print(f"Written to {output_path} with {len(diff_records)} records")

def main():
# EXAMPLE: python pfb_to_zip.py -i ./export_2023-03-27T02_42_17.avro -o ./outputs/ -c ./config.py -d https://portal.pedscommons.org/api/v0/submission/_dictionary/_all -t ncit
# Example for diff: python3 pfb_to_zip.py -i ../new_data.avro -o ../outputs/ -c ./config.py -d ../old_data.avro

parser = argparse.ArgumentParser(description="Build ZIP bundle for data delivery after project request has been approved")
parser.add_argument('-c', '--config', help='The config file')
parser.add_argument('-i', '--input', help='Input PFB file path')
parser.add_argument('-o', '--output', help='Output ZIP directory')
parser.add_argument('-t', '--terminology', help='The ontology you want to transform GEN3 values to.')
parser.add_argument('-a', '--analysis', help='The consorti script you want to execute. Ex:INRG')
parser.add_argument('-d', '--diff', help='Path to the old avro file to diff against')

try:
args = parser.parse_args()
Expand All @@ -359,6 +460,11 @@ def main():
print(err)
parser.print_help()

if args.diff:
diff = Difference(args.diff, input_path)
diff_output = output_path + 'diff.avro'
diff.generate_diff(diff_output)
input_path = diff_output

tmp_folder = "./tmp"
pfb_export = PFBExporter(input_path, tmp_folder, output_path, config_file, ontology, True if analysis_script_consortia and analysis_script_consortia != "" else False)
Expand All @@ -371,16 +477,12 @@ def main():
pfb_export.filter_attributes(is_black_list=False)
# pfb_export.filter_attributes(is_black_list=True)
pfb_export.setup_and_run_analysis(analysis_script_consortia) #TODO how to find consortium
pfb_export.to_ontology_code()
pfb_export.to_ontology_code()
pfb_export.add_external_references_material()
pfb_export.zip()
pfb_export.clean_up()





if __name__ == "__main__":
main()