diff --git a/gpu_bdb/queries/load_test/gpu_bdb_load_test.py b/gpu_bdb/queries/load_test/gpu_bdb_load_test.py index ab57193f..1e9dff6f 100755 --- a/gpu_bdb/queries/load_test/gpu_bdb_load_test.py +++ b/gpu_bdb/queries/load_test/gpu_bdb_load_test.py @@ -1,5 +1,6 @@ from bdb_tools.utils import benchmark, gpubdb_argparser, run_query from bdb_tools.readers import build_reader +from cudf.core.dtypes import Decimal64Dtype import os, subprocess, math, time @@ -33,22 +34,21 @@ def get_schema(table): schema = fp.read() names = [line.replace(",", "").split()[0] for line in schema.split("\n")] types = [ - line.replace(",", "") + line.replace(", ", "") .split()[1] .replace("bigint", "int") .replace("string", "str") for line in schema.split("\n") ] - types = [ - col_type.split("(")[0].replace("decimal", "float") for col_type in types - ] + return names, types def read_csv_table(table, chunksize="256 MiB"): # build dict of dtypes to use when reading CSV names, types = get_schema(table) - dtype = {names[i]: types[i] for i in range(0, len(names))} + types_dec_as_str = ['str' if 'decimal' in t else t for t in types] + dtype_dec_as_str = {names[i]: types_dec_as_str[i] for i in range(0, len(names))} data_dir = config["data_dir"].split('parquet_')[0] base_path = f"{data_dir}/data/{table}" @@ -67,7 +67,7 @@ def read_csv_table(table, chunksize="256 MiB"): if "audit" not in fn and os.path.getsize(f"{base_path}/{fn}") > 0 ] df = dask_cudf.read_csv( - paths, sep="|", names=names, dtype=dtype, chunksize=chunksize, quoting=3 + paths, sep="|", names=names, dtype=dtype_dec_as_str, chunksize=chunksize, quoting=3 ) else: paths = [ @@ -83,9 +83,15 @@ def read_csv_table(table, chunksize="256 MiB"): if os.path.getsize(f"{base_path}/{fn}") > 0 ] df = dask_cudf.read_csv( - paths, sep="|", names=names, dtype=types, chunksize=chunksize, quoting=3 + paths, sep="|", names=names, dtype=types_dec_as_str, chunksize=chunksize, quoting=3 ) + for i, dtype in enumerate(types): + if 'decimal' in dtype: + precision = int(dtype[dtype.find("(")+1:dtype.find(",")]) + scale = int(dtype[dtype.find(",")+1:dtype.find(")")]) + df[names[i]] = df[names[i]].astype(Decimal64Dtype(precision, scale)) + return df @@ -133,7 +139,7 @@ def repartition(table, outdir, npartitions=None, chunksize=None, compression="sn def main(client, config): # location you want to write Parquet versions of the table data data_dir = config["data_dir"].split('parquet_')[0] - outdir = f"{data_dir}/parquet_{part_size}gb/" + outdir = f"{data_dir}/parquet_{part_size}gb_decimal/" t0 = time.time() for table in tables: