diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/Copybook.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/Copybook.scala index bb2bf317b..365069b86 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/Copybook.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/Copybook.scala @@ -437,18 +437,27 @@ object Copybook { * Nested field names can contain '.' to identify the exact field. * If the field name is unique '.' is not required. * - * @param field The AST object of the field - * @param recordBytes Binary encoded data of the record - * @param startOffset An offset to the beginning of the field in the data (in bytes). + * @param field The AST object of the field + * @param recordBytes Binary encoded data of the record + * @param configuredStartOffset An offset to the beginning of the field in the data (in bytes). + * @param fieldStartOffsetOverride If this offset is 0 or negative use the field offset is defined by the copybook. + * Otherwise, use the specified offset * @return The value of the field * */ - def setPrimitiveField(field: Primitive, recordBytes: Array[Byte], value: Any, startOffset: Int = 0): Unit = { + def setPrimitiveField(field: Primitive, recordBytes: Array[Byte], value: Any, configuredStartOffset: Int = 0, fieldStartOffsetOverride: Int = 0): Unit = { field.encode match { case Some(encode) => val fieldBytes = encode(value) - val startByte = field.binaryProperties.offset + startOffset - val endByte = field.binaryProperties.offset + startOffset + field.binaryProperties.actualSize + + val startByte = if (fieldStartOffsetOverride > 0) + fieldStartOffsetOverride + else + field.binaryProperties.offset + configuredStartOffset + val endByte = if (fieldStartOffsetOverride > 0) + fieldStartOffsetOverride + field.binaryProperties.actualSize + else + field.binaryProperties.offset + configuredStartOffset + field.binaryProperties.actualSize if (startByte < 0 || endByte > recordBytes.length) { throw new IllegalArgumentException(s"Cannot set value for field '${field.name}' because the field is out of bounds of the record.") diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala index 3c6e92957..48ba6e424 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala @@ -19,12 +19,29 @@ package za.co.absa.cobrix.cobol.parser.recordformats sealed trait RecordFormat object RecordFormat { - case object FixedLength extends RecordFormat - case object FixedBlock extends RecordFormat - case object VariableLength extends RecordFormat - case object VariableBlock extends RecordFormat - case object AsciiText extends RecordFormat - case object CobrixAsciiText extends RecordFormat + case object FixedLength extends RecordFormat { + override def toString: String = "F" + } + + case object FixedBlock extends RecordFormat { + override def toString: String = "FB" + } + + case object VariableLength extends RecordFormat { + override def toString: String = "V" + } + + case object VariableBlock extends RecordFormat { + override def toString: String = "VB" + } + + case object AsciiText extends RecordFormat { + override def toString: String = "D" + } + + case object CobrixAsciiText extends RecordFormat { + override def toString: String = "D2" + } def withNameOpt(s: String): Option[RecordFormat] = { s match { diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala index cea0b29da..17ac03ee0 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala @@ -85,6 +85,7 @@ class DefaultSource CobolParametersValidator.checkSanity(cobolParameters) val readerParameters = CobolParametersParser.getReaderProperties(cobolParameters, None) + CobolParametersValidator.validateParametersForWriting(readerParameters) val outputPath = new Path(path) val hadoopConf = sqlContext.sparkContext.hadoopConfiguration diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala index dff41b35c..a979384ba 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala @@ -16,22 +16,24 @@ package za.co.absa.cobrix.spark.cobol.source.parameters -import java.io.FileNotFoundException -import java.nio.file.{Files, Paths} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf -import za.co.absa.cobrix.cobol.reader.parameters.CobolParameters +import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser._ -import za.co.absa.cobrix.spark.cobol.utils.ResourceUtils.getClass +import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, ReaderParameters} import za.co.absa.cobrix.spark.cobol.utils.{FileNameUtils, FsType} +import java.io.FileNotFoundException +import java.nio.file.{Files, Paths} +import scala.collection.mutable.ListBuffer + /** * This class provides methods for checking the Spark job options after parsed. */ object CobolParametersValidator { - def checkSanity(params: CobolParameters) = { + def checkSanity(params: CobolParameters): Unit = { if (params.sourcePaths.isEmpty) { throw new IllegalArgumentException("Data source path must be specified.") } @@ -113,4 +115,29 @@ object CobolParametersValidator { validatePath(fileName) } } -} \ No newline at end of file + + def validateParametersForWriting(readerParameters: ReaderParameters): Unit = { + val issues = new ListBuffer[String] + + if (readerParameters.recordFormat != RecordFormat.FixedLength && readerParameters.recordFormat != RecordFormat.VariableLength) { + issues += s"Only '${RecordFormat.FixedLength}' and '${RecordFormat.VariableLength}' values for 'record_format' are supported for writing, " + + s"provided value: '${readerParameters.recordFormat}'" + } + + if (readerParameters.variableSizeOccurs) { + issues += "Variable size OCCURS ('variable_size_occurs = true') is not supported for writing" + } + + if (readerParameters.startOffset != 0 || readerParameters.endOffset != 0) { + issues += "'record_start_offset' and 'record_end_offset' are not supported for writing" + } + + if (readerParameters.fileStartOffset != 0 || readerParameters.fileEndOffset != 0) { + issues += "'file_start_offset' and 'file_end_offset' are not supported for writing" + } + + if (issues.nonEmpty) { + throw new IllegalArgumentException(s"Writer validation issues: ${issues.mkString("; ")}") + } + } +} diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/BasicRecordCombiner.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/BasicRecordCombiner.scala index 4521d16ae..fdc8554a3 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/BasicRecordCombiner.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/BasicRecordCombiner.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.DataFrame import za.co.absa.cobrix.cobol.parser.Copybook import za.co.absa.cobrix.cobol.parser.ast.datatype.{Decimal, Integral} import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive, Statement} +import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters import za.co.absa.cobrix.cobol.reader.schema.CobolSchema @@ -61,16 +62,62 @@ class BasicRecordCombiner extends RecordCombiner { (idx, position) } - val size = cobolSchema.getRecordSize + val hasRdw = readerParameters.recordFormat == RecordFormat.VariableLength + val isRdwBigEndian = readerParameters.isRdwBigEndian + val adjustment1 = if (readerParameters.isRdwPartRecLength) 4 else 0 + val adjustment2 = readerParameters.rdwAdjustment + + val size = if (hasRdw) { + cobolSchema.getRecordSize + 4 + } else { + cobolSchema.getRecordSize + } + + val startOffset = if (hasRdw) 4 else 0 + + val recordLengthLong = cobolSchema.getRecordSize.toLong + adjustment1.toLong + adjustment2.toLong + if (recordLengthLong < 0) { + throw new IllegalArgumentException( + s"Invalid RDW length $recordLengthLong. Check 'is_rdw_part_of_record_length' and 'rdw_adjustment'." + ) + } + if (isRdwBigEndian && recordLengthLong > 0xFFFFL) { + throw new IllegalArgumentException( + s"RDW length $recordLengthLong exceeds 65535 and cannot be encoded in big-endian mode." + ) + } + if (!isRdwBigEndian && recordLengthLong > Int.MaxValue.toLong) { + throw new IllegalArgumentException( + s"RDW length $recordLengthLong exceeds ${Int.MaxValue} and cannot be encoded safely." + ) + } + val recordLength = recordLengthLong.toInt df.rdd.map { row => val ar = new Array[Byte](size) + if (hasRdw) { + if (isRdwBigEndian) { + ar(0) = ((recordLength >> 8) & 0xFF).toByte + ar(1) = (recordLength & 0xFF).toByte + // The last two bytes are reserved and defined by IBM as binary zeros on all platforms. + ar(2) = 0 + ar(3) = 0 + } else { + ar(0) = (recordLength & 0xFF).toByte + ar(1) = ((recordLength >> 8) & 0xFF).toByte + // This is non-standard. But so are little-endian RDW headers. + // As an advantage, it has no effect for small records but adds support for big records (> 64KB). + ar(2) = ((recordLength >> 16) & 0xFF).toByte + ar(3) = ((recordLength >> 24) & 0xFF).toByte + } + } + sparkFieldPositions.foreach { case (cobolIdx, sparkIdx) => if (!row.isNullAt(sparkIdx)) { val fieldStr = row.get(sparkIdx) val cobolField = cobolFields(cobolIdx) - Copybook.setPrimitiveField(cobolField, ar, fieldStr, 0) + Copybook.setPrimitiveField(cobolField, ar, fieldStr, startOffset) } } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/VariableLengthEbcdicWriterSuite.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/VariableLengthEbcdicWriterSuite.scala new file mode 100644 index 000000000..2e8a9c555 --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/VariableLengthEbcdicWriterSuite.scala @@ -0,0 +1,171 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.writer + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SaveMode +import org.scalatest.Assertion +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase +import za.co.absa.cobrix.spark.cobol.source.fixtures.{BinaryFileFixture, TextComparisonFixture} + +class VariableLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase with BinaryFileFixture with TextComparisonFixture { + + import spark.implicits._ + + private val copybookContents = + """ 01 RECORD. + 05 A PIC X(1). + 05 B PIC X(5). + """ + + "cobol writer" should { + "write simple variable -record-length EBCDIC data files with big-endian RDWs" in { + withTempDirectory("cobol_writer1") { tempDir => + val df = List(("A", "First"), ("B", "Scnd"), ("C", "Last")).toDF("A", "B") + + val path = new Path(tempDir, "writer1") + + df.coalesce(1) + .orderBy("A") + .write + .format("cobol") + .mode(SaveMode.Overwrite) + .option("copybook_contents", copybookContents) + .option("record_format", "V") + .option("is_rdw_big_endian", "true") + .save(path.toString) + + val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration) + + assert(fs.exists(path), "Output directory should exist") + val files = fs.listStatus(path) + .filter(_.getPath.getName.startsWith("part-")) + assert(files.nonEmpty, "Output directory should contain part files") + + val partFile = files.head.getPath + val data = fs.open(partFile) + val bytes = new Array[Byte](files.head.getLen.toInt) + data.readFully(bytes) + data.close() + + // Expected EBCDIC data for sample test data + val expected = Array[Byte]( + 0x00.toByte, 0x06.toByte, 0x00.toByte, 0x00.toByte, // RDW1 + 0xC1.toByte, 0xC6.toByte, 0x89.toByte, 0x99.toByte, 0xa2.toByte, 0xa3.toByte, // A,First + 0x00.toByte, 0x06.toByte, 0x00.toByte, 0x00.toByte, // RDW2 + 0xC2.toByte, 0xE2.toByte, 0x83.toByte, 0x95.toByte, 0x84.toByte, 0x40.toByte, // B,Scnd_ + 0x00.toByte, 0x06.toByte, 0x00.toByte, 0x00.toByte, // RDW3 + 0xC3.toByte, 0xD3.toByte, 0x81.toByte, 0xa2.toByte, 0xa3.toByte, 0x40.toByte // C,Last_ + ) + + if (!bytes.sameElements(expected)) { + println(s"Expected bytes: ${expected.map("%02X" format _).mkString(" ")}") + println(s"Actual bytes: ${bytes.map("%02X" format _).mkString(" ")}") + + assert(bytes.sameElements(expected), "Written data should match expected EBCDIC encoding") + } + } + } + + "write simple variable -record-length EBCDIC data files with little-endian RDWs and RDW being part of record length" in { + withTempDirectory("cobol_writer1") { tempDir => + val df = List(("A", "First"), ("B", "Scnd"), ("C", "Last")).toDF("A", "B") + + val path = new Path(tempDir, "writer1") + + df.coalesce(1) + .orderBy("A") + .write + .format("cobol") + .mode(SaveMode.Overwrite) + .option("copybook_contents", copybookContents) + .option("record_format", "V") + .option("is_rdw_big_endian", "false") + .option("is_rdw_part_of_record_length", "true") + .save(path.toString) + + val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration) + + assert(fs.exists(path), "Output directory should exist") + val files = fs.listStatus(path) + .filter(_.getPath.getName.startsWith("part-")) + assert(files.nonEmpty, "Output directory should contain part files") + + val partFile = files.head.getPath + val data = fs.open(partFile) + val bytes = new Array[Byte](files.head.getLen.toInt) + data.readFully(bytes) + data.close() + + // Expected EBCDIC data for sample test data + val expected = Array[Byte]( + 0x0A.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, // RDW1 + 0xC1.toByte, 0xC6.toByte, 0x89.toByte, 0x99.toByte, 0xa2.toByte, 0xa3.toByte, // A,First + 0x0A.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, // RDW2 + 0xC2.toByte, 0xE2.toByte, 0x83.toByte, 0x95.toByte, 0x84.toByte, 0x40.toByte, // B,Scnd_ + 0x0A.toByte, 0x00.toByte, 0x00.toByte, 0x00.toByte, // RDW3 + 0xC3.toByte, 0xD3.toByte, 0x81.toByte, 0xa2.toByte, 0xa3.toByte, 0x40.toByte // C,Last_ + ) + + if (!bytes.sameElements(expected)) { + println(s"Expected bytes: ${expected.map("%02X" format _).mkString(" ")}") + println(s"Actual bytes: ${bytes.map("%02X" format _).mkString(" ")}") + + assert(bytes.sameElements(expected), "Written data should match expected EBCDIC encoding") + } + } + } + + "throw an exception on unexpected output record format" in { + withTempDirectory("cobol_writer2") { tempDir => + val df = List(("A", "First"), ("B", "Scnd"), ("C", "Last")).toDF("A", "B") + + val path = new Path(tempDir, "writer2") + + val exception = intercept[IllegalArgumentException] { + df.coalesce(1) + .orderBy("A") + .write + .format("cobol") + .mode(SaveMode.Overwrite) + .option("copybook_contents", copybookContents) + .option("record_format", "FB") // Not supported + .option("variable_size_occurs", "true") // Not supported + .option("file_start_offset", "2") // Not supported + .option("record_end_offset", "4") // Not supported + .save(path.toString) + } + + assert(exception.getMessage.contains("Writer validation issues: Only 'F' and 'V' values for 'record_format' are supported for writing, provided value: 'FB';")) + assert(exception.getMessage.contains("Variable size OCCURS ('variable_size_occurs = true') is not supported for writing")) + assert(exception.getMessage.contains("'record_start_offset' and 'record_end_offset' are not supported for writing")) + assert(exception.getMessage.contains("'file_start_offset' and 'file_end_offset' are not supported for writing")) + } + } + } + + def assertArraysEqual(actual: Array[Byte], expected: Array[Byte]): Assertion = { + if (!actual.sameElements(expected)) { + val actualHex = actual.map(b => f"0x$b%02X").mkString(", ") + val expectedHex = expected.map(b => f"0x$b%02X").mkString(", ") + fail(s"Actual: $actualHex\nExpected: $expectedHex") + } else { + succeed + } + } +}