diff --git a/.github/workflows/jacoco_check.yml b/.github/workflows/jacoco_check.yml index c45097ee6..9d244dcc2 100644 --- a/.github/workflows/jacoco_check.yml +++ b/.github/workflows/jacoco_check.yml @@ -95,10 +95,10 @@ jobs: uses: actions/github-script@v6.4.1 with: script: | - core.setFailed('Changed files coverage is less than ${{ matrix.changedCobolParserOverride }}% for 'cobol-parser'!') + core.setFailed("Changed files coverage is less than ${{ matrix.changedCobolParserOverride }}% for 'cobol-parser'!") - name: Fail PR if changed files coverage for 'spark-cobol' is less than ${{ matrix.changed }}% if: ${{ steps.jacoco.outputs.coverage-changed-files < matrix.changed }} uses: actions/github-script@v6.4.1 with: script: | - core.setFailed('Changed files coverage is less than ${{ matrix.changed }}% for 'spark-cobol!') + core.setFailed("Changed files coverage is less than ${{ matrix.changed }}% for 'spark-cobol'!") diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/Copybook.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/Copybook.scala index 365069b86..36b47667a 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/Copybook.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/Copybook.scala @@ -90,7 +90,7 @@ class Copybook(val ast: CopybookAST) extends Logging with Serializable { def getFieldValueByName(fieldName: String, recordBytes: Array[Byte], startOffset: Int = 0): Any = { val primitive = getPrimitiveFieldByName(fieldName) - extractPrimitiveField(primitive, recordBytes, startOffset) + getPrimitiveField(primitive, recordBytes, startOffset) } /** @@ -203,23 +203,6 @@ class Copybook(val ast: CopybookAST) extends Logging with Serializable { } } - /** - * Get value of a field of the copybook record by the AST object of the field - * - * Nested field names can contain '.' to identify the exact field. - * If the field name is unique '.' is not required. - * - * @param field The AST object of the field - * @param bytes Binary encoded data of the record - * @param startOffset An offset to the beginning of the field in the data (in bytes). - * @return The value of the field - * - */ - def extractPrimitiveField(field: Primitive, bytes: Array[Byte], startOffset: Int = 0): Any = { - val slicedBytes = bytes.slice(field.binaryProperties.offset + startOffset, field.binaryProperties.offset + startOffset + field.binaryProperties.actualSize) - field.decodeTypeValue(0, slicedBytes) - } - /** This routine is used for testing by generating a layout position information to compare with mainframe output */ def generateRecordLayoutPositions(): String = { var fieldCounter: Int = 0 @@ -431,6 +414,28 @@ object Copybook { new Copybook(schema) } + /** + * Get value of a field of the copybook record by the AST object of the field + * + * Nested field names can contain '.' to identify the exact field. + * If the field name is unique '.' is not required. + * + * @param field The AST object of the field + * @param bytes Binary encoded data of the record + * @param startOffset An offset to the beginning of the field in the data (in bytes). + * @return The value of the field + * + */ + def getPrimitiveField(field: Primitive, bytes: Array[Byte], startOffset: Int = 0): Any = { + val slicedBytes = bytes.slice(field.binaryProperties.offset + startOffset, field.binaryProperties.offset + startOffset + field.binaryProperties.actualSize) + field.decodeTypeValue(0, slicedBytes) + } + + /** Same as getPrimitiveField(). The original method is left for backwards compatibility. */ + def extractPrimitiveField(field: Primitive, bytes: Array[Byte], startOffset: Int = 0): Any = { + getPrimitiveField(field, bytes, startOffset) + } + /** * Set value of a field of the copybook record by the AST object of the field * diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala index 794172a2b..e1627c626 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala @@ -103,7 +103,7 @@ case class Primitive( /** Returns a string representation of the field */ override def toString: String = { - s"${" " * 2 * level}$camelCased ${camelCase(redefines.getOrElse(""))} $dataType" + s"${" " * 2 * level}$name ${redefines.getOrElse("")} $dataType" } /** Returns true if the field is a child segment */ diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Statement.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Statement.scala index 65043d15c..cc833ccec 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Statement.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Statement.scala @@ -18,10 +18,6 @@ package za.co.absa.cobrix.cobol.parser.ast /** Trait for Cobol copybook AST element (a statement). */ trait Statement { - val camelCased: String = { - camelCase(name) - } - /** Returns the level of the AST element */ def level: Int @@ -94,15 +90,7 @@ trait Statement { /** Returns a string representation of the AST element */ override def toString: String = { - s"${" " * 2 * level}$camelCased ${camelCase(redefines.getOrElse(""))}" - } - - /** Returns this the name of this fields as a camel cased string */ - def camelCase(s: String): String = { - s.replace(".", "") - .split("-") - .map(c => c.toLowerCase.capitalize) - .mkString + s"${" " * 2 * level}$name ${redefines.getOrElse("")}" } /** Returns the original AST element with updated binary properties */ diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedWithRecordLengthExprRawRecordExtractor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedWithRecordLengthExprRawRecordExtractor.scala index df54983cc..dd53c91c3 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedWithRecordLengthExprRawRecordExtractor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedWithRecordLengthExprRawRecordExtractor.scala @@ -17,6 +17,7 @@ package za.co.absa.cobrix.cobol.reader.extractors.raw import org.slf4j.LoggerFactory +import za.co.absa.cobrix.cobol.parser.Copybook import za.co.absa.cobrix.cobol.parser.ast.Primitive import za.co.absa.cobrix.cobol.reader.iterator.RecordLengthExpression import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters @@ -123,7 +124,7 @@ class FixedWithRecordLengthExprRawRecordExtractor(ctx: RawRecordContext, final private def getRecordLengthFromField(lengthAST: Primitive, binaryDataStart: Array[Byte]): Int = { val length = if (isLengthMapEmpty) { - ctx.copybook.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match { + Copybook.getPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match { case i: Int => i case l: Long => l.toInt case s: String => Try{ s.toInt }.getOrElse(throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type, encountered: '$s'.")) @@ -132,7 +133,7 @@ class FixedWithRecordLengthExprRawRecordExtractor(ctx: RawRecordContext, case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.") } } else { - ctx.copybook.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match { + Copybook.getPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match { case i: Int => getRecordLengthFromMapping(i.toString) case l: Long => getRecordLengthFromMapping(l.toString) case d: BigDecimal => getRecordLengthFromMapping(d.toString()) @@ -165,7 +166,7 @@ class FixedWithRecordLengthExprRawRecordExtractor(ctx: RawRecordContext, expr.fields.foreach{ case (name, field) => - val obj = ctx.copybook.extractPrimitiveField(field, binaryDataStart, readerProperties.startOffset) + val obj = Copybook.getPrimitiveField(field, binaryDataStart, readerProperties.startOffset) try { obj match { case i: Int => evaluator.setValue(name, i) @@ -194,7 +195,7 @@ class FixedWithRecordLengthExprRawRecordExtractor(ctx: RawRecordContext, private def getSegmentId(data: Array[Byte]): Option[String] = { segmentIdField.map(field => { - val fieldValue = ctx.copybook.extractPrimitiveField(field, data, readerProperties.startOffset) + val fieldValue = Copybook.getPrimitiveField(field, data, readerProperties.startOffset) if (fieldValue == null) { log.error(s"An unexpected null encountered for segment id at $byteIndex") "" diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala index c1b48c983..96db4690b 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala @@ -99,14 +99,14 @@ object IndexGenerator extends Logging { } else { if (isValid) { if (isReallyHierarchical && rootRecordId.isEmpty) { - val curSegmentId = getSegmentId(copybook.get, segmentField.get, record) + val curSegmentId = getSegmentId(segmentField.get, record) if ((curSegmentId.nonEmpty && rootSegmentIds.isEmpty) || (rootSegmentIds.nonEmpty && rootSegmentIds.contains(curSegmentId))) { rootRecordId = curSegmentId } } if (canSplit && needSplit(recordsInChunk, bytesInChunk)) { - if (!isReallyHierarchical || isSegmentGoodForSplit(rootSegmentIds, copybook.get, segmentField.get, record)) { + if (!isReallyHierarchical || isSegmentGoodForSplit(rootSegmentIds, segmentField.get, record)) { val indexEntry = SparseIndexEntry(byteIndex, -1, fileId, recordIndex) val len = index.length // Do not add an entry if we are still at the same position as the previous entry. @@ -157,15 +157,14 @@ object IndexGenerator extends Logging { } private def isSegmentGoodForSplit(rootSegmentIds: List[String], - copybook: Copybook, segmentField: Primitive, record: Array[Byte]): Boolean = { - val segmentId = getSegmentId(copybook, segmentField, record) + val segmentId = getSegmentId(segmentField, record) rootSegmentIds.contains(segmentId) } - private def getSegmentId(copybook: Copybook, segmentIdField: Primitive, data: Array[Byte]): String = { - val v = copybook.extractPrimitiveField(segmentIdField, data) + private def getSegmentId(segmentIdField: Primitive, data: Array[Byte]): String = { + val v = Copybook.getPrimitiveField(segmentIdField, data) if (v == null) "" else v.toString.trim } } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala index ec4cda4fa..7e85cb8e0 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala @@ -17,6 +17,7 @@ package za.co.absa.cobrix.cobol.reader.iterator import za.co.absa.cobrix.cobol.internal.Logging +import za.co.absa.cobrix.cobol.parser.Copybook import za.co.absa.cobrix.cobol.reader.extractors.record.{RecordExtractors, RecordHandler} import za.co.absa.cobrix.cobol.reader.parameters.{CorruptFieldsPolicy, ReaderParameters} import za.co.absa.cobrix.cobol.reader.schema.CobolSchema @@ -108,7 +109,7 @@ class FixedLenNestedRowIterator[T: ClassTag]( private def getSegmentId(data: Array[Byte], offset: Int): Option[String] = { segmentIdField.map(field => { - val fieldValue = cobolSchema.copybook.extractPrimitiveField(field, data, offset) + val fieldValue = Copybook.getPrimitiveField(field, data, offset) if (fieldValue == null) { logger.error(s"An unexpected null encountered for segment id at $byteIndex") "" diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala index 716d473e5..88a00bb68 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala @@ -18,10 +18,9 @@ package za.co.absa.cobrix.cobol.reader.iterator import za.co.absa.cobrix.cobol.internal.Logging import za.co.absa.cobrix.cobol.parser.Copybook -import za.co.absa.cobrix.cobol.parser.ast.Primitive import za.co.absa.cobrix.cobol.parser.headerparsers.RecordHeaderParser -import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor +import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters import za.co.absa.cobrix.cobol.reader.stream.SimpleStream import za.co.absa.cobrix.cobol.reader.validator.ReaderParametersValidator @@ -143,7 +142,7 @@ class VRLRecordReader(cobolSchema: Copybook, private def getSegmentId(data: Array[Byte]): Option[String] = { segmentIdField.map(field => { - val fieldValue = cobolSchema.extractPrimitiveField(field, data, readerProperties.startOffset) + val fieldValue = Copybook.getPrimitiveField(field, data, readerProperties.startOffset) if (fieldValue == null) { logger.error(s"An unexpected null encountered for segment id at $byteIndex") "" diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/extract/BinaryExtractorSpec.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/extract/BinaryExtractorSpec.scala index 72ad1252a..43dbc850b 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/extract/BinaryExtractorSpec.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/extract/BinaryExtractorSpec.scala @@ -17,12 +17,12 @@ package za.co.absa.cobrix.cobol.parser.extract import org.scalatest.funsuite.AnyFunSuite -import za.co.absa.cobrix.cobol.parser.CopybookParser import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, CobolType} import za.co.absa.cobrix.cobol.parser.ast.{BinaryProperties, Group, Primitive} import za.co.absa.cobrix.cobol.parser.decoders.DecoderSelector import za.co.absa.cobrix.cobol.parser.encoding.{EBCDIC, EncoderSelector} import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy +import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser} class BinaryExtractorSpec extends AnyFunSuite { @@ -113,7 +113,7 @@ class BinaryExtractorSpec extends AnyFunSuite { 0x00.toByte, 0x00.toByte, 0x2F.toByte ) - val copybook = CopybookParser.parseTree(copyBookContents) + val copybook: Copybook = CopybookParser.parseTree(copyBookContents) val startOffset: Int = 0 test("Test extract primitive field") { @@ -121,7 +121,7 @@ class BinaryExtractorSpec extends AnyFunSuite { // using getFieldByName val statement = copybook.getFieldByName("ID") val field: Primitive = statement.asInstanceOf[Primitive] - val result: Any = copybook.extractPrimitiveField(field, bytes, startOffset) + val result: Any = Copybook.getPrimitiveField(field, bytes, startOffset) assert(result.asInstanceOf[Int] === 6) // traverse AST and extract all primitives to map @@ -130,7 +130,7 @@ class BinaryExtractorSpec extends AnyFunSuite { def traverseAst(group: Group): Unit = { for (child <- group.children) { if (child.isInstanceOf[Primitive]) { - extractedData += (child.name -> copybook.extractPrimitiveField(child.asInstanceOf[Primitive], + extractedData += (child.name -> Copybook.extractPrimitiveField(child.asInstanceOf[Primitive], bytes, startOffset)) } else { assert(child.isInstanceOf[Group] === true) @@ -162,7 +162,7 @@ class BinaryExtractorSpec extends AnyFunSuite { val primitive: Primitive = Primitive(level, name, name, lineNumber, dataType, redefines, isRedefined, occurs, to, dependingOn, Map(), isDependee, isFiller, DecoderSelector.getDecoder(dataType), EncoderSelector.getEncoder(dataType), binaryProperties)(None) - val result2: Any = copybook.extractPrimitiveField(primitive, bytes, startOffset) + val result2: Any = Copybook.extractPrimitiveField(primitive, bytes, startOffset) assert(result2.asInstanceOf[String] === "EXAMPLE4") } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala index a979384ba..bf7b703ee 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala @@ -18,7 +18,7 @@ package za.co.absa.cobrix.spark.cobol.source.parameters import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.SparkConf +import org.slf4j.LoggerFactory import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser._ import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, ReaderParameters} @@ -32,7 +32,18 @@ import scala.collection.mutable.ListBuffer * This class provides methods for checking the Spark job options after parsed. */ object CobolParametersValidator { - + private val log = LoggerFactory.getLogger(this.getClass) + + /** + * Validates the provided COBOL processing parameters and throws an exception + * if any inconsistency or missing configuration is detected. + * + * This method checks only if the copybook is defined and not ambiguous. + * + * @param params the COBOL processing parameters to be validated + * @return this method does not return any value. It throws an IllegalArgumentException if + * the validation fails. + */ def checkSanity(params: CobolParameters): Unit = { if (params.sourcePaths.isEmpty) { throw new IllegalArgumentException("Data source path must be specified.") @@ -43,12 +54,19 @@ object CobolParametersValidator { } } - def validateOrThrow(sparkConf: SparkConf, hadoopConf: Configuration): Unit = { - val parameters = Map[String, String](PARAM_COPYBOOK_PATH -> sparkConf.get(PARAM_COPYBOOK_PATH), PARAM_SOURCE_PATH -> sparkConf.get - (PARAM_SOURCE_PATH)) - validateOrThrow(parameters, hadoopConf) - } - + /** + * Validates the provided parameters for processing COBOL data and throws an exception + * if any inconsistency or issue is detected in the configurations. + * + * The method checks for the presence and validity of parameters required for data input + * and ensures that conflicting or missing configurations are handled correctly. It also + * verifies the validity and accessibility of copybook files when specified. + * + * @param parameters a map of configuration options to be validated. + * @param hadoopConf the Hadoop configuration object used to support filesystem operations + * when validating copybook paths in distributed storage systems. + * @return this method does not return any value. It throws an exception if any validation fails. + */ def validateOrThrow(parameters: Map[String, String], hadoopConf: Configuration): Unit = { val copyBookContents = parameters.get(PARAM_COPYBOOK_CONTENTS) val copyBookPathFileName = parameters.get(PARAM_COPYBOOK_PATH) @@ -116,6 +134,14 @@ object CobolParametersValidator { } } + /** + * Validates the parameters provided for writing operations in COBOL data processing. + * Verifies that the specified configurations are compatible with the writer and throws + * an exception in case of violations. + * + * @param readerParameters the configuration containing options to be validated for writing operations. + * @return this method does not return any value but throws an IllegalArgumentException if the validation fails. + */ def validateParametersForWriting(readerParameters: ReaderParameters): Unit = { val issues = new ListBuffer[String] @@ -124,8 +150,13 @@ object CobolParametersValidator { s"provided value: '${readerParameters.recordFormat}'" } - if (readerParameters.variableSizeOccurs) { - issues += "Variable size OCCURS ('variable_size_occurs = true') is not supported for writing" + if (readerParameters.variableSizeOccurs && + readerParameters.recordFormat == RecordFormat.FixedLength) { + log.warn("Option 'variable_size_occurs=true' is used with 'record_format=F' which means records can have variable length. It is highly recommended to use 'record_format=V' instead.") + } + + if (readerParameters.occursMappings.nonEmpty) { + issues += "OCCURS mapping option ('occurs_mappings') is not supported for writing" } if (readerParameters.startOffset != 0 || readerParameters.endOffset != 0) { @@ -136,6 +167,10 @@ object CobolParametersValidator { issues += "'file_start_offset' and 'file_end_offset' are not supported for writing" } + if (readerParameters.multisegment.isDefined) { + issues += "Multi-segment options ('segment_field', 'segment_filter', etc) are not supported for writing" + } + if (issues.nonEmpty) { throw new IllegalArgumentException(s"Writer validation issues: ${issues.mkString("; ")}") } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/CobolStreamer.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/CobolStreamer.scala index 856329ee7..ff452639a 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/CobolStreamer.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/CobolStreamer.scala @@ -20,12 +20,8 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.DStream -import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat -import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage -import za.co.absa.cobrix.cobol.parser.policies.{FillerNamingPolicy, StringTrimmingPolicy} -import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters -import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser._ +import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters import za.co.absa.cobrix.spark.cobol.reader.{FixedLenNestedReader, FixedLenReader} import za.co.absa.cobrix.spark.cobol.source.parameters.CobolParametersValidator import za.co.absa.cobrix.spark.cobol.utils.HDFSUtils @@ -43,10 +39,15 @@ object CobolStreamer { } implicit class Deserializer(@transient val ssc: StreamingContext) extends Serializable { + val parameters: Map[String, String] = Map[String, String]( + PARAM_COPYBOOK_PATH -> ssc.sparkContext.getConf.get(PARAM_COPYBOOK_PATH), + PARAM_SOURCE_PATH -> ssc.sparkContext.getConf.get(PARAM_SOURCE_PATH) + ) + + CobolParametersValidator.validateOrThrow(parameters, ssc.sparkContext.hadoopConfiguration) + + val reader: FixedLenReader = CobolStreamer.getReader(ssc) - CobolParametersValidator.validateOrThrow(ssc.sparkContext.getConf, ssc.sparkContext.hadoopConfiguration) - val reader = CobolStreamer.getReader(ssc) - def cobolStream(): DStream[Row] = { ssc .binaryRecordsStream(ssc.sparkContext.getConf.get(PARAM_SOURCE_PATH), reader.getCobolSchema.getRecordSize) @@ -59,4 +60,4 @@ object CobolStreamer { } } -} \ No newline at end of file +} diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/DependingOnField.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/DependingOnField.scala new file mode 100644 index 000000000..32d6d4626 --- /dev/null +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/DependingOnField.scala @@ -0,0 +1,21 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.writer + +import za.co.absa.cobrix.cobol.parser.ast.Primitive + +case class DependingOnField(cobolField: Primitive, var baseOffset: Int) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala index 44adf6d59..82c636816 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala @@ -34,6 +34,14 @@ class NestedRecordCombiner extends RecordCombiner { import NestedRecordCombiner._ + /** + * Converts Spark DataFrame to the RDD with data in mainframe format as arrays of bytes, each array being a record. + * + * @param df The input DataFrame + * @param cobolSchema The output COBOL schema + * @param readerParameters The reader properties which are actually writer properties parsed as spark-cobol options + * @return The RDD of records in mainframe format + */ override def combine(df: DataFrame, cobolSchema: CobolSchema, readerParameters: ReaderParameters): RDD[Array[Byte]] = { val hasRdw = readerParameters.recordFormat == RecordFormat.VariableLength val isRdwBigEndian = readerParameters.isRdwBigEndian @@ -64,15 +72,25 @@ class NestedRecordCombiner extends RecordCombiner { s"RDW length $recordLengthLong exceeds ${Int.MaxValue} and cannot be encoded safely." ) } - val recordLength = recordLengthLong.toInt - - processRDD(df.rdd, cobolSchema.copybook, df.schema, size, recordLength, startOffset, hasRdw, isRdwBigEndian) + processRDD(df.rdd, cobolSchema.copybook, df.schema, size, adjustment1 + adjustment2, startOffset, hasRdw, isRdwBigEndian, readerParameters.variableSizeOccurs) } } object NestedRecordCombiner { private val log = LoggerFactory.getLogger(this.getClass) + /** + * Generates a field definition string containing the PIC clause and USAGE clause for a primitive COBOL field. + * + * This method extracts the picture clause (PIC) and usage information from the field's data type + * and combines them into a single definition string. For integral and decimal types with compact + * encoding, it includes the USAGE clause; otherwise, the default DISPLAY usage is assumed or omitted. + * + * The purpose is to render COBOL field name and type in exceptions and log messages. + * + * @param field The primitive field whose definition string should be generated + * @return A string containing the PIC clause and optional USAGE clause, with any trailing whitespace trimmed + */ def getFieldDefinition(field: Primitive): String = { val pic = field.dataType.originalPic.getOrElse(field.dataType.pic) @@ -85,41 +103,101 @@ object NestedRecordCombiner { s"$pic $usage".trim } + /** + * Constructs a writer AST (Abstract Syntax Tree) from a copybook and Spark schema for serialization purposes. + * + * This method creates a hierarchical structure of WriterAst nodes that maps the copybook structure to the + * corresponding Spark schema. The resulting GroupField can be used to serialize Spark Rows into binary format + * according to the copybook specification. The AST contains getter functions that extract values from Rows + * and metadata needed to write those values to the correct byte positions in the output buffer. + * + * The purpose of WriterAst class hierarchy is to provide memory and CPU efficient way of creating binary + * records from Spark dataframes. It links Cobol schema and Spark schema in a single tree. + * + * @param copybook The copybook definition describing the binary record layout and field specifications + * @param schema The Spark StructType schema that corresponds to the structure of the data to be written + * @return A GroupField representing the root of the writer AST, containing all non-filler, non-redefines + * fields with their associated getter functions and position information for binary serialization + */ def constructWriterAst(copybook: Copybook, schema: StructType): GroupField = { - buildGroupField(getAst(copybook), schema, row => row) + buildGroupField(getAst(copybook), schema, row => row, "", new mutable.HashMap[String, DependingOnField]()) } - def processRDD(rdd: RDD[Row], copybook: Copybook, schema: StructType, recordSize: Int, recordLengthHeader: Int, startOffset: Int, hasRdw: Boolean, isRdwBigEndian: Boolean): RDD[Array[Byte]] = { + /** + * Processes an RDD of Spark Rows and converts them into an RDD of byte arrays according to the copybook specification. + * + * The resulting RDD can then be written to storage as files in mainframe format (usually, EBCDIC). + * + * Each Row is transformed into a fixed or variable-length byte array representation based on the copybook layout. + * + * Variable-record-length records supported are ones that have RDW headers (big-endian or little-endian). + * For variable-length records with OCCURS DEPENDING ON, the output may be trimmed to the actual bytes written. + * + * @param rdd The input RDD containing Spark Rows to be converted to binary format + * @param copybook The copybook definition that describes the binary record layout and field specifications + * @param schema The Spark StructType schema that corresponds to the structure of the input Rows + * @param recordSize The maximum size in bytes allocated for each output record + * @param recordLengthAdj An adjustment value added to the bytes written when computing the RDW length field + * @param startOffset The byte offset at which data writing should begin, typically 0 for fixed-length or 4 for RDW records + * @param hasRdw A flag indicating whether to prepend a Record Descriptor Word header to each output record + * @param isRdwBigEndian A flag indicating the byte order for the RDW header, true for big-endian, false for little-endian + * @param variableSizeOccurs A flag indicating whether OCCURS DEPENDING ON fields should use actual element counts rather than maximum sizes + * @return An RDD of byte arrays, where each array represents one record in binary format according to the copybook specification + */ + private[cobrix] def processRDD(rdd: RDD[Row], + copybook: Copybook, + schema: StructType, + recordSize: Int, + recordLengthAdj: Int, + startOffset: Int, + hasRdw: Boolean, + isRdwBigEndian: Boolean, + variableSizeOccurs: Boolean): RDD[Array[Byte]] = { val writerAst = constructWriterAst(copybook, schema) rdd.mapPartitions { rows => rows.map { row => val ar = new Array[Byte](recordSize) + val bytesWritten = writeToBytes(writerAst, row, ar, startOffset, variableSizeOccurs) + if (hasRdw) { + val recordLengthToWriteToRDW = bytesWritten + recordLengthAdj + if (isRdwBigEndian) { - ar(0) = ((recordLengthHeader >> 8) & 0xFF).toByte - ar(1) = (recordLengthHeader & 0xFF).toByte + ar(0) = ((recordLengthToWriteToRDW >> 8) & 0xFF).toByte + ar(1) = (recordLengthToWriteToRDW & 0xFF).toByte // The last two bytes are reserved and defined by IBM as binary zeros on all platforms. ar(2) = 0 ar(3) = 0 } else { - ar(0) = (recordLengthHeader & 0xFF).toByte - ar(1) = ((recordLengthHeader >> 8) & 0xFF).toByte + ar(0) = (recordLengthToWriteToRDW & 0xFF).toByte + ar(1) = ((recordLengthToWriteToRDW >> 8) & 0xFF).toByte // This is non-standard. But so are little-endian RDW headers. // As an advantage, it has no effect for small records but adds support for big records (> 64KB). - ar(2) = ((recordLengthHeader >> 16) & 0xFF).toByte - ar(3) = ((recordLengthHeader >> 24) & 0xFF).toByte + ar(2) = ((recordLengthToWriteToRDW >> 16) & 0xFF).toByte + ar(3) = ((recordLengthToWriteToRDW >> 24) & 0xFF).toByte } } - writeToBytes(writerAst, row, ar, startOffset) - - ar + if (!variableSizeOccurs || recordSize == bytesWritten + startOffset) { + ar + } else { + java.util.Arrays.copyOf(ar, bytesWritten + startOffset) + } } } } + /** + * Retrieves the appropriate AST (Abstract Syntax Tree) group from a copybook. + * If the root AST has exactly one child and that child is a Group, returns that child. + * Otherwise, returns the root AST itself. This normalization ensures consistent handling + * of copybook structures regardless of whether they have a single top-level group or multiple elements. + * + * @param copybook The copybook object containing the AST structure to extract from + * @return The normalized Group representing the copybook structure, either the single child group or the root AST + */ def getAst(copybook: Copybook): Group = { val rootAst = copybook.ast @@ -134,20 +212,21 @@ object NestedRecordCombiner { * Recursively walks the copybook group and the Spark StructType in lockstep, producing * [[WriterAst]] nodes whose getters extract the correct value from a [[org.apache.spark.sql.Row]]. * - * @param group A copybook Group node whose children will be processed. - * @param schema The Spark StructType that corresponds to `group`. - * @param getter A function that, given the "outer" Row, returns the Row that belongs to this group. - * @param path The path to the field + * @param group A copybook Group node whose children will be processed. + * @param schema The Spark StructType that corresponds to `group`. + * @param getter A function that, given the "outer" Row, returns the Row that belongs to this group. + * @param path The path to the field + * @param dependeeMap A map of field names to their corresponding DependingOnField specs, used to resolve dependencies for OCCURS DEPENDING ON fields. * @return A [[GroupField]] covering all non-filler, non-redefines children found in both * the copybook and the Spark schema. */ - private def buildGroupField(group: Group, schema: StructType, getter: GroupGetter, path: String = ""): GroupField = { + private def buildGroupField(group: Group, schema: StructType, getter: GroupGetter, path: String, dependeeMap: mutable.HashMap[String, DependingOnField]): GroupField = { val children = group.children.withFilter { stmt => stmt.redefines.isEmpty }.map { case s if s.isFiller => Filler(s.binaryProperties.actualSize) - case p: Primitive => buildPrimitiveNode(p, schema, path) - case g: Group => buildGroupNode(g, schema, path) + case p: Primitive => buildPrimitiveNode(p, schema, path, dependeeMap) + case g: Group => buildGroupNode(g, schema, path, dependeeMap) } GroupField(children.toSeq, group, getter) } @@ -158,7 +237,20 @@ object NestedRecordCombiner { * * Returns a filler when the field is absent from the schema (e.g. filtered out during reading). */ - private def buildPrimitiveNode(p: Primitive, schema: StructType, path: String = ""): WriterAst = { + private def buildPrimitiveNode(p: Primitive, schema: StructType, path: String, dependeeMap: mutable.HashMap[String, DependingOnField]): WriterAst = { + def addDependee(): DependingOnField = { + val spec = DependingOnField(p, p.binaryProperties.offset) + val uppercaseName = p.name.toUpperCase() + if (dependeeMap.contains(uppercaseName)) { + throw new IllegalArgumentException(s"Duplicate field name '${p.name}' found in copybook. " + + s"Field names must be unique (case-insensitive) when OCCURS DEPENDING ON is used. " + + s"Already found a dependee field with the same name at line ${dependeeMap(uppercaseName).cobolField.lineNumber}, " + + s"current field line number: ${p.lineNumber}.") + } + dependeeMap += (p.name.toUpperCase -> spec) + spec + } + val fieldName = p.name val fieldIndexOpt = schema.fields.zipWithIndex.find { case (field, _) => field.name.equalsIgnoreCase(fieldName) @@ -172,13 +264,27 @@ object NestedRecordCombiner { } if (p.occurs.isDefined) { // Array of primitives - PrimitiveArray(p, row => row.getAs[mutable.WrappedArray[AnyRef]](idx)) + val dependingOnField = p.dependingOn.map { dependingOn => + dependeeMap.getOrElse(dependingOn.toUpperCase, throw new IllegalStateException( + s"Array field '${p.name}' depends on '$dependingOn' which is not found among previously processed fields." + )) + } + PrimitiveArray(p, row => row.getAs[mutable.WrappedArray[AnyRef]](idx), dependingOnField) } else { - PrimitiveField(p, row => row.get(idx)) + if (p.isDependee) { + PrimitiveDependeeField(addDependee()) + } else { + PrimitiveField(p, row => row.get(idx)) + } } }.getOrElse { - log.error(s"Field '$path${p.name}' is not found in Spark schema. Will be replaced by filler.") - Filler(p.binaryProperties.actualSize) + // Dependee fields need not be defined in Spark schema. + if (p.isDependee) { + PrimitiveDependeeField(addDependee()) + } else { + log.error(s"Field '$path${p.name}' is not found in Spark schema. Will be replaced by filler.") + Filler(p.binaryProperties.actualSize) + } } } @@ -189,7 +295,7 @@ object NestedRecordCombiner { * * Returns a filler when the field is absent from the schema. */ - private def buildGroupNode(g: Group, schema: StructType, path: String = ""): WriterAst = { + private def buildGroupNode(g: Group, schema: StructType, path: String, dependeeMap: mutable.HashMap[String, DependingOnField]): WriterAst = { val fieldName = g.name val fieldIndexOpt = schema.fields.zipWithIndex.find { case (field, _) => field.name.equalsIgnoreCase(fieldName) @@ -200,8 +306,13 @@ object NestedRecordCombiner { // Array of structs – the element type must be a StructType schema(idx).dataType match { case ArrayType(elementType: StructType, _) => - val childAst = buildGroupField(g, elementType, row => row, s"$path${g.name}.") - GroupArray(childAst, g, row => row.getAs[mutable.WrappedArray[AnyRef]](idx)) + val dependingOnField = g.dependingOn.map { dependingOn => + dependeeMap.getOrElse(dependingOn.toUpperCase, throw new IllegalStateException( + s"Array group '${g.name}' depends on '$dependingOn' which is not found among previously processed fields." + )) + } + val childAst = buildGroupField(g, elementType, row => row, s"$path${g.name}.", dependeeMap) + GroupArray(childAst, g, row => row.getAs[mutable.WrappedArray[AnyRef]](idx), dependingOnField) case other => throw new IllegalArgumentException( s"Expected ArrayType(StructType) for group field '${g.name}' with OCCURS, but got $other") @@ -211,7 +322,7 @@ object NestedRecordCombiner { schema(idx).dataType match { case nestedSchema: StructType => val childGetter: GroupGetter = row => row.getAs[Row](idx) - val childAst = buildGroupField(g, nestedSchema, childGetter, s"$path${g.name}.") + val childAst = buildGroupField(g, nestedSchema, childGetter, s"$path${g.name}.", dependeeMap) GroupField(childAst.children, g, childGetter) case other => throw new IllegalArgumentException( @@ -235,14 +346,17 @@ object NestedRecordCombiner { * supplied. The row array may contain fewer elements than the copybook allows — any * missing tail elements are silently skipped, leaving those bytes as zeroes. * - * @param ast The [[WriterAst]] node to process. - * @param row The Spark [[Row]] from which values are read. - * @param ar The target byte array (record buffer). - * @param currentOffset RDW prefix length (0 for fixed-length records, 4 for variable). + * @param ast The [[WriterAst]] node to process. + * @param row The Spark [[Row]] from which values are read. + * @param ar The target byte array (record buffer). + * @param currentOffset RDW prefix length (0 for fixed-length records, 4 for variable). + * @param variableLengthOccurs A flag indicating whether size of OCCURS DEPENDING ON should match the number of elements + * and not always fixed. + * @throws IllegalArgumentException if a field value cannot be encoded according to the copybook definition. */ - private def writeToBytes(ast: WriterAst, row: Row, ar: Array[Byte], currentOffset: Int): Int = { + private def writeToBytes(ast: WriterAst, row: Row, ar: Array[Byte], currentOffset: Int, variableLengthOccurs: Boolean): Int = { ast match { - // ── Filler ────────────────────────────────────────────────────── + // ── Filler ────────────────────────────────────────────────────────────── case Filler(size) => size // ── Plain primitive ────────────────────────────────────────────────────── @@ -253,19 +367,30 @@ object NestedRecordCombiner { } cobolField.binaryProperties.actualSize + // ── Primitive which has an OCCURS DEPENDS ON ───────────────────────────── + case PrimitiveDependeeField(spec) => + // NOTE: baseOffset is mutated here for each row. This is safe because rows + // are processed sequentially within mapPartitions, and the offset is always + // updated before being read in subsequent array-element writes. + spec.baseOffset = currentOffset + spec.cobolField.binaryProperties.actualSize + // ── Plain nested group ─────────────────────────────────────────────────── case GroupField(children, cobolField, getter) => val nestedRow = getter(row) if (nestedRow != null) { var writtenBytes = 0 - children.foreach(child => - writtenBytes += writeToBytes(child, nestedRow, ar, currentOffset + writtenBytes) - ) + children.foreach { child => + val written = writeToBytes(child, nestedRow, ar, currentOffset + writtenBytes, variableLengthOccurs) + writtenBytes += written + } + writtenBytes + } else { + cobolField.binaryProperties.actualSize } - cobolField.binaryProperties.actualSize // ── Array of primitives (OCCURS on a primitive field) ─────────────────── - case PrimitiveArray(cobolField, arrayGetter) => + case PrimitiveArray(cobolField, arrayGetter, dependingOn) => val arr = arrayGetter(row) if (arr != null) { val maxElements = cobolField.arrayMaxSize // copybook upper bound @@ -284,11 +409,24 @@ object NestedRecordCombiner { } i += 1 } + dependingOn.foreach(spec => + Copybook.setPrimitiveField(spec.cobolField, ar, elementsToWrite, fieldStartOffsetOverride = spec.baseOffset) + ) + if (variableLengthOccurs) { + // For variable-length OCCURS, the actual size is determined by the number of elements written. + elementSize * elementsToWrite + } else { + cobolField.binaryProperties.actualSize + } + } else { + dependingOn.foreach(spec => + Copybook.setPrimitiveField(spec.cobolField, ar, 0, fieldStartOffsetOverride = spec.baseOffset) + ) + if (variableLengthOccurs) 0 else cobolField.binaryProperties.actualSize } - cobolField.binaryProperties.actualSize // ── Array of groups (OCCURS on a group field) ─────────────────────────── - case GroupArray(groupField: GroupField, cobolField, arrayGetter) => + case GroupArray(groupField: GroupField, cobolField, arrayGetter, dependingOn) => val arr = arrayGetter(row) if (arr != null) { val maxElements = cobolField.arrayMaxSize // copybook upper bound @@ -303,12 +441,25 @@ object NestedRecordCombiner { // Build an adjusted element offset so that each child's base offset // (which is relative to the group's base) lands at the correct position in ar. val elementStartOffset = baseOffset + i * elementSize - writeToBytes(groupField, elementRow, ar, elementStartOffset) + writeToBytes(groupField, elementRow, ar, elementStartOffset, variableLengthOccurs) } i += 1 } + dependingOn.foreach(spec => + Copybook.setPrimitiveField(spec.cobolField, ar, elementsToWrite, fieldStartOffsetOverride = spec.baseOffset) + ) + if (variableLengthOccurs) { + // For variable-length OCCURS, the actual size is determined by the number of elements written. + elementSize * elementsToWrite + } else { + cobolField.binaryProperties.actualSize + } + } else { + dependingOn.foreach(spec => + Copybook.setPrimitiveField(spec.cobolField, ar, 0, fieldStartOffsetOverride = spec.baseOffset) + ) + if (variableLengthOccurs) 0 else cobolField.binaryProperties.actualSize } - cobolField.binaryProperties.actualSize } } } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/WriterAst.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/WriterAst.scala index 8e280a09b..fab1c01a3 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/WriterAst.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/WriterAst.scala @@ -23,6 +23,29 @@ import scala.collection.mutable sealed trait WriterAst +/** + * Represents an abstract syntax tree (AST) for writing COBOL data structures from Spark rows. + * + * This object provides type aliases and case classes that define the structure for converting + * Spark Row data into COBOL format. The AST models the hierarchical structure of COBOL records, + * including primitive fields, groups, arrays, and fillers. + * + * Type aliases define getter functions for extracting data from Spark rows: + * - PrimitiveGetter extracts a single value from a row + * - GroupGetter extracts a nested row structure from a row + * - ArrayGetter extracts an array of values from a row + * + * Case classes represent different node types in the writer AST: + * - Filler represents unused space in COBOL records with a specified size + * - PrimitiveField represents a single COBOL primitive field with its getter function + * - PrimitiveDependeeField represents a primitive field that other fields depend on for their occurrence count + * - GroupField represents a COBOL group containing child fields with its getter function + * - PrimitiveArray represents an array of primitive values with optional depending-on semantics + * - GroupArray represents an array of group structures with optional depending-on semantics + * + * The depending-on fields support COBOL's OCCURS DEPENDING ON clause, where the actual number + * of array elements is determined by the value of another field at runtime. + */ object WriterAst { type PrimitiveGetter = Row => Any type GroupGetter = Row => Row @@ -30,7 +53,8 @@ object WriterAst { case class Filler(fillerSize: Int) extends WriterAst case class PrimitiveField(cobolField: Primitive, getter: PrimitiveGetter) extends WriterAst + case class PrimitiveDependeeField(spec: DependingOnField) extends WriterAst case class GroupField(children: Seq[WriterAst], cobolField: Group, getter: GroupGetter) extends WriterAst - case class PrimitiveArray(cobolField: Primitive, arrayGetter: ArrayGetter) extends WriterAst - case class GroupArray(groupField: GroupField, cobolField: Group, arrayGetter: ArrayGetter) extends WriterAst + case class PrimitiveArray(cobolField: Primitive, arrayGetter: ArrayGetter, dependingOn: Option[DependingOnField]) extends WriterAst + case class GroupArray(groupField: GroupField, cobolField: Group, arrayGetter: ArrayGetter, dependingOn: Option[DependingOnField]) extends WriterAst } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidatorSuite.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidatorSuite.scala new file mode 100644 index 000000000..0d8cd85ae --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidatorSuite.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.source.parameters + +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{FixedLength, VariableBlock} +import za.co.absa.cobrix.cobol.reader.parameters.{MultisegmentParameters, ReaderParameters} + +class CobolParametersValidatorSuite extends AnyWordSpec { + "validateParametersForWriting" should { + "detect validation issues" in { + val readParams = ReaderParameters( + recordFormat = VariableBlock, + occursMappings = Map("A" -> Map("B" -> 1)), + startOffset = 1, + fileEndOffset = 2, + multisegment = Some(MultisegmentParameters("SEG", None, Seq.empty, "", Map.empty, Map.empty)) + ) + + val ex = intercept[IllegalArgumentException] { + CobolParametersValidator.validateParametersForWriting(readParams) + } + + assert(ex.getMessage.contains("Writer validation issues: Only 'F' and 'V' values for 'record_format' are supported for writing, provided value: 'VB';")) + assert(ex.getMessage.contains("OCCURS mapping option ('occurs_mappings') is not supported for writing")) + assert(ex.getMessage.contains("'record_start_offset' and 'record_end_offset' are not supported for writing")) + assert(ex.getMessage.contains("'file_start_offset' and 'file_end_offset' are not supported for writing")) + assert(ex.getMessage.contains("Multi-segment options ('segment_field', 'segment_filter', etc) are not supported for writing")) + } + + "do not throw exceptions if the configuration is okay" in { + val readParams = ReaderParameters( + recordFormat = FixedLength, + variableSizeOccurs = true + ) + + CobolParametersValidator.validateParametersForWriting(readParams) + } + } + +} diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/NestedWriterSuite.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/NestedWriterSuite.scala index 92ea8107b..49b668457 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/NestedWriterSuite.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/NestedWriterSuite.scala @@ -19,11 +19,27 @@ package za.co.absa.cobrix.spark.cobol.writer import org.apache.hadoop.fs.Path import org.apache.spark.sql.SaveMode import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive} +import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser} import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase import za.co.absa.cobrix.spark.cobol.source.fixtures.{BinaryFileFixture, TextComparisonFixture} class NestedWriterSuite extends AnyWordSpec with SparkTestBase with BinaryFileFixture with TextComparisonFixture { - private val copybook = + private val copybookWithOccurs = + """ 01 RECORD. + | 05 ID PIC 9(2). + | 05 FILLER PIC 9(1). + | 05 NUMBERS PIC 9(2) OCCURS 0 TO 5. + | 05 PLACE. + | 10 COUNTRY-CODE PIC X(2). + | 10 CITY PIC X(10). + | 05 PEOPLE OCCURS 0 TO 3. + | 10 NAME PIC X(14). + | 10 FILLER PIC X(1). + | 10 PHONE-NUMBER PIC X(12). + |""".stripMargin + + private val copybookWithDependingOn = """ 01 RECORD. | 05 ID PIC 9(2). | 05 FILLER PIC 9(1). @@ -33,28 +49,175 @@ class NestedWriterSuite extends AnyWordSpec with SparkTestBase with BinaryFileFi | 05 PLACE. | 10 COUNTRY-CODE PIC X(2). | 10 CITY PIC X(10). - | 05 CNT2 PIC 9(1). + | 05 cnt-2 PIC 9(1). | 05 PEOPLE - | OCCURS 0 TO 3 DEPENDING ON CNT2. + | OCCURS 0 TO 3 DEPENDING ON cnt-2. | 10 NAME PIC X(14). | 10 FILLER PIC X(1). | 10 PHONE-NUMBER PIC X(12). |""".stripMargin + "getFieldDefinition" should { + "support alphanumeric PIC" in { + val copybookContents = + """ 01 RECORD. + 05 NAME PIC X(10). + """ + + val copybook = CopybookParser.parse(copybookContents) + val nameField = copybook.getFieldByName("NAME").asInstanceOf[Primitive] + + val actual = NestedRecordCombiner.getFieldDefinition(nameField) + + assert(actual == "X(10)") + } + + "support integral with COMP" in { + val copybookContents = + """ 01 RECORD. + 05 NAME PIC 9(10) USAGE IS COMP. + """ + + val copybook = CopybookParser.parse(copybookContents) + val nameField = copybook.getFieldByName("NAME").asInstanceOf[Primitive] + + val actual = NestedRecordCombiner.getFieldDefinition(nameField) + + assert(actual == "9(10) COMP-4") + } + + "support integral DISPLAY" in { + val copybookContents = + """ 01 RECORD. + 05 NAME PIC 9(10). + """ + + val copybook = CopybookParser.parse(copybookContents) + val nameField = copybook.getFieldByName("NAME").asInstanceOf[Primitive] + + val actual = NestedRecordCombiner.getFieldDefinition(nameField) + + assert(actual == "9(10) USAGE IS DISPLAY") + } + + "support decimal with COMP" in { + val copybookContents = + """ 01 RECORD. + 05 NAME PIC S9(5)V99 USAGE IS COMP. + """ + + val copybook = CopybookParser.parse(copybookContents) + val nameField = copybook.getFieldByName("NAME").asInstanceOf[Primitive] + + val actual = NestedRecordCombiner.getFieldDefinition(nameField) + + assert(actual == "S9(5)V99 COMP-4") + } + + "support decimal DISPLAY" in { + val copybookContents = + """ 01 RECORD. + 05 NAME PIC S9(5)V99 USAGE IS DISPLAY. + """ + + val copybook = CopybookParser.parse(copybookContents) + val nameField = copybook.getFieldByName("NAME").asInstanceOf[Primitive] + + val actual = NestedRecordCombiner.getFieldDefinition(nameField) + + assert(actual == "S9(5)V99 USAGE IS DISPLAY") + } + } + "writer" should { - "write the dataframe according to the copybook" in { - //val parsedCopybook = CopybookParser.parse(copybook) + "write the dataframe with OCCURS" in { + val exampleJsons = Seq( + """{"ID":1,"NUMBERS":[10,20,30],"PLACE":{"COUNTRY_CODE":"US","CITY":"New York"},"PEOPLE":[{"NAME":"John Doe","PHONE_NUMBER":"555-1234"},{"NAME": "Jane Smith","PHONE_NUMBER":"555-5678"}]}""", + """{"ID":2,"NUMBERS":[],"PLACE":{"COUNTRY_CODE":"ZA","CITY":"Cape Town"},"PEOPLE":[{"NAME":"Test User","PHONE_NUMBER":"555-1235"}]}""" + ) + + import spark.implicits._ + + val df = spark.read.json(exampleJsons.toDS()) + .select("ID", "NUMBERS", "PLACE", "PEOPLE") + + withTempDirectory("cobol_writer1") { tempDir => + val path = new Path(tempDir, "writer1") + + df.coalesce(1) + .orderBy("id") + .write + .format("cobol") + .mode(SaveMode.Overwrite) + .option("copybook_contents", copybookWithOccurs) + .option("record_format", "V") + .option("is_rdw_big_endian", "true") + .option("is_rdw_part_of_record_length", "false") + .save(path.toString) + + // val df2 = spark.read.format("cobol") + // .option("copybook_contents", copybookWithOccurs) + // .option("record_format", "V") + // .option("is_rdw_big_endian", "true") + // .option("is_rdw_part_of_record_length", "false") + // .load(path.toString) + // + // println(SparkUtils.convertDataFrameToPrettyJSON(df2)) + + val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration) + + assert(fs.exists(path), "Output directory should exist") + val files = fs.listStatus(path) + .filter(_.getPath.getName.startsWith("part-")) + assert(files.nonEmpty, "Output directory should contain part files") + + val partFile = files.head.getPath + val data = fs.open(partFile) + val bytes = new Array[Byte](files.head.getLen.toInt) + data.readFully(bytes) + data.close() + + // Expected EBCDIC data for sample test data + val expected = Array( + 0x00, 0x6A, 0x00, 0x00, // RDW record 0 + 0xF0, 0xF1, 0x00, 0xF1, 0xF0, 0xF2, 0xF0, 0xF3, 0xF0, 0x00, 0x00, 0x00, 0x00, 0xE4, 0xE2, 0xD5, 0x85, + 0xA6, 0x40, 0xE8, 0x96, 0x99, 0x92, 0x40, 0x40, 0xD1, 0x96, 0x88, 0x95, 0x40, 0xC4, 0x96, 0x85, 0x40, + 0x40, 0x40, 0x40, 0x40, 0x40, 0x00, 0xF5, 0xF5, 0xF5, 0xCA, 0xF1, 0xF2, 0xF3, 0xF4, 0x40, 0x40, 0x40, 0x40, + 0xD1, 0x81, 0x95, 0x85, 0x40, 0xE2, 0x94, 0x89, 0xA3, 0x88, 0x40, 0x40, 0x40, 0x40, 0x00, 0xF5, 0xF5, 0xF5, + 0xCA, 0xF5, 0xF6, 0xF7, 0xF8, 0x40, 0x40, 0x40, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x6A, 0x00, 0x00, // RDW record 1 + 0xF0, 0xF2, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xE9, 0xC1, 0xC3, 0x81, + 0x97, 0x85, 0x40, 0xE3, 0x96, 0xA6, 0x95, 0x40, 0xE3, 0x85, 0xA2, 0xA3, 0x40, 0xE4, 0xA2, 0x85, 0x99, + 0x40, 0x40, 0x40, 0x40, 0x40, 0x00, 0xF5, 0xF5, 0xF5, 0xCA, 0xF1, 0xF2, 0xF3, 0xF5, 0x40, 0x40, 0x40, 0x40, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 + ).map(_.toByte) + + if (!bytes.sameElements(expected)) { + println(s"Expected bytes: ${expected.map("%02X" format _).mkString(" ")}") + println(s"Actual bytes: ${bytes.map("%02X" format _).mkString(" ")}") + //println(s"Actual bytes: ${bytes.map("0x%02X" format _).mkString(", ")}") + + assert(bytes.sameElements(expected), "Written data should match expected EBCDIC encoding") + } + } + } + + "write the dataframe with OCCURS DEPENDING ON" in { + //val parsedCopybook = CopybookParser.parse(copybookWithDependingOn) //println(parsedCopybook.generateRecordLayoutPositions()) val exampleJsons = Seq( - """{"ID":1,"cnt1":3,"NUMBERS":[10,20,30],"PLACE":{"COUNTRY_CODE":"US","CITY":"New York"},"CNT2":2,"PEOPLE":[{"NAME":"John Doe","PHONE_NUMBER":"555-1234"},{"NAME": "Jane Smith","PHONE_NUMBER":"555-5678"}]}""", - """{"ID":2,"cnt1":0,"NUMBERS":[],"PLACE":{"COUNTRY_CODE":"ZA","CITY":"Cape Town"},"CNT2":1,"PEOPLE":[{"NAME":"Test User","PHONE_NUMBER":"555-1235"}]}""" + """{"ID":1,"cnt1":3,"NUMBERS":[10,20,30],"PLACE":{"COUNTRY_CODE":"US","CITY":"New York"},"PEOPLE":[{"NAME":"John Doe","PHONE_NUMBER":"555-1234"},{"NAME": "Jane Smith","PHONE_NUMBER":"555-5678"}]}""", + """{"ID":2,"cnt1":0,"NUMBERS":[],"PLACE":{"COUNTRY_CODE":"ZA","CITY":"Cape Town"},"PEOPLE":[{"NAME":"Test User","PHONE_NUMBER":"555-1235"}]}""" ) import spark.implicits._ val df = spark.read.json(exampleJsons.toDS()) - .select("ID", "cnt1", "NUMBERS", "PLACE", "CNT2", "PEOPLE") + .select("ID", "cnt1", "NUMBERS", "PLACE", "PEOPLE") // df.printSchema() // df.show() @@ -67,22 +230,22 @@ class NestedWriterSuite extends AnyWordSpec with SparkTestBase with BinaryFileFi //) //dfWithDump.select("record_dump").show(truncate = false) - withTempDirectory("cobol_writer1") { tempDir => - val path = new Path(tempDir, "writer1") + withTempDirectory("cobol_writer2") { tempDir => + val path = new Path(tempDir, "writer2") df.coalesce(1) .orderBy("id") .write .format("cobol") .mode(SaveMode.Overwrite) - .option("copybook_contents", copybook) + .option("copybook_contents", copybookWithDependingOn) .option("record_format", "V") .option("is_rdw_big_endian", "false") .option("is_rdw_part_of_record_length", "true") .save(path.toString) //val df2 = spark.read.format("cobol") - // .option("copybook_contents", copybook) + // .option("copybook_contents", copybookWithDependingOn) // .option("record_format", "V") // .option("is_rdw_big_endian", "false") // .option("is_rdw_part_of_record_length", "true") @@ -129,5 +292,189 @@ class NestedWriterSuite extends AnyWordSpec with SparkTestBase with BinaryFileFi } } } + + "write the dataframe with OCCURS DEPENDING ON and variable length occurs" in { + val exampleJsons = Seq( + """{"ID":1,"NUMBERS":[10,20,30],"PLACE":{"COUNTRY_CODE":"US","CITY":"New York"},"PEOPLE":[{"NAME":"John Doe","PHONE_NUMBER":"555-1234"},{"NAME": "Jane Smith","PHONE_NUMBER":"555-5678"}]}""", + """{"ID":2,"NUMBERS":[],"PLACE":{"COUNTRY_CODE":"ZA","CITY":"Cape Town"},"PEOPLE":[{"NAME":"Test User","PHONE_NUMBER":"555-1235"}]}""" + ) + + import spark.implicits._ + + val df = spark.read.json(exampleJsons.toDS()) + .select("ID", "NUMBERS", "PLACE", "PEOPLE") + + withTempDirectory("cobol_writer3") { tempDir => + val path = new Path(tempDir, "writer3") + + df.coalesce(1) + .orderBy("id") + .write + .format("cobol") + .mode(SaveMode.Overwrite) + .option("copybook_contents", copybookWithDependingOn) + .option("record_format", "V") + .option("is_rdw_big_endian", "false") + .option("is_rdw_part_of_record_length", "true") + .option("variable_size_occurs", "true") + .save(path.toString) + + // val df2 = spark.read.format("cobol") + // .option("copybook_contents", copybookWithDependingOn) + // .option("record_format", "V") + // .option("is_rdw_big_endian", "false") + // .option("is_rdw_part_of_record_length", "true") + // .option("variable_size_occurs", "true") + // .load(path.toString) + // println(SparkUtils.convertDataFrameToPrettyJSON(df2)) + + val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration) + + assert(fs.exists(path), "Output directory should exist") + val files = fs.listStatus(path) + .filter(_.getPath.getName.startsWith("part-")) + assert(files.nonEmpty, "Output directory should contain part files") + + val partFile = files.head.getPath + val data = fs.open(partFile) + val bytes = new Array[Byte](files.head.getLen.toInt) + data.readFully(bytes) + data.close() + + // Expected EBCDIC data for sample test data + val expected = Array( + 0x51, 0x00, 0x00, 0x00, // RDW record 0 + 0xF0, 0xF1, 0x00, 0xF3, 0xF1, 0xF0, 0xF2, 0xF0, 0xF3, 0xF0, 0xE4, 0xE2, 0xD5, 0x85, 0xA6, 0x40, 0xE8, 0x96, + 0x99, 0x92, 0x40, 0x40, 0xF2, 0xD1, 0x96, 0x88, 0x95, 0x40, 0xC4, 0x96, 0x85, 0x40, 0x40, 0x40, 0x40, 0x40, + 0x40, 0x00, 0xF5, 0xF5, 0xF5, 0xCA, 0xF1, 0xF2, 0xF3, 0xF4, 0x40, 0x40, 0x40, 0x40, 0xD1, 0x81, 0x95, 0x85, + 0x40, 0xE2, 0x94, 0x89, 0xA3, 0x88, 0x40, 0x40, 0x40, 0x40, 0x00, 0xF5, 0xF5, 0xF5, 0xCA, 0xF5, 0xF6, 0xF7, + 0xF8, 0x40, 0x40, 0x40, 0x40, + 0x30, 0x00, 0x00, 0x00, // RDW record 1 + 0xF0, 0xF2, 0x00, 0xF0, 0xE9, 0xC1, 0xC3, 0x81, 0x97, 0x85, 0x40, 0xE3, 0x96, 0xA6, 0x95, 0x40, 0xF1, 0xE3, + 0x85, 0xA2, 0xA3, 0x40, 0xE4, 0xA2, 0x85, 0x99, 0x40, 0x40, 0x40, 0x40, 0x40, 0x00, 0xF5, 0xF5, 0xF5, 0xCA, + 0xF1, 0xF2, 0xF3, 0xF5, 0x40, 0x40, 0x40, 0x40 + ).map(_.toByte) + + if (!bytes.sameElements(expected)) { + println(s"Expected bytes: ${expected.map("%02X" format _).mkString(" ")}") + println(s"Actual bytes: ${bytes.map("%02X" format _).mkString(" ")}") + //println(s"Actual bytes: ${bytes.map("0x%02X" format _).mkString(", ")}") + + assert(bytes.sameElements(expected), "Written data should match expected EBCDIC encoding") + } + } + } + + "write the dataframe with OCCURS DEPENDING ON and variable length occurs and null values" in { + val exampleJsons = Seq( + """{"ID":1,"NUMBERS":[10,20,30],"PLACE":{"COUNTRY_CODE":"US","CITY":"New York"}}""", + """{"ID":2,"PLACE":{"COUNTRY_CODE":"ZA","CITY":"Cape Town"},"PEOPLE":[{"NAME":"Test User","PHONE_NUMBER":"555-1235"}]}""" + ) + + import spark.implicits._ + + val df = spark.read.json(exampleJsons.toDS()) + .select("ID", "NUMBERS", "PLACE", "PEOPLE") + + withTempDirectory("cobol_writer3") { tempDir => + val path = new Path(tempDir, "writer3") + + df.coalesce(1) + .orderBy("id") + .write + .format("cobol") + .mode(SaveMode.Overwrite) + .option("copybook_contents", copybookWithDependingOn) + .option("record_format", "V") + .option("is_rdw_big_endian", "false") + .option("is_rdw_part_of_record_length", "true") + .option("variable_size_occurs", "true") + .save(path.toString) + +// val df2 = spark.read.format("cobol") +// .option("copybook_contents", copybookWithDependingOn) +// .option("record_format", "V") +// .option("is_rdw_big_endian", "false") +// .option("is_rdw_part_of_record_length", "true") +// .option("variable_size_occurs", "true") +// .load(path.toString) +// println(SparkUtils.convertDataFrameToPrettyJSON(df2)) + + val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration) + + assert(fs.exists(path), "Output directory should exist") + val files = fs.listStatus(path) + .filter(_.getPath.getName.startsWith("part-")) + assert(files.nonEmpty, "Output directory should contain part files") + + val partFile = files.head.getPath + val data = fs.open(partFile) + val bytes = new Array[Byte](files.head.getLen.toInt) + data.readFully(bytes) + data.close() + + // Expected EBCDIC data for sample test data + val expected = Array( + 0x1B, 0x00, 0x00, 0x00, // RDW record 0 + 0xF0, 0xF1, 0x00, 0xF3, 0xF1, 0xF0, 0xF2, 0xF0, 0xF3, 0xF0, 0xE4, 0xE2, 0xD5, 0x85, 0xA6, 0x40, 0xE8, 0x96, + 0x99, 0x92, 0x40, 0x40, 0xF0, + 0x30, 0x00, 0x00, 0x00, + 0xF0, 0xF2, 0x00, 0xF0, 0xE9, 0xC1, 0xC3, 0x81, 0x97, 0x85, 0x40, 0xE3, 0x96, 0xA6, 0x95, 0x40, 0xF1, 0xE3, + 0x85, 0xA2, 0xA3, 0x40, 0xE4, 0xA2, 0x85, 0x99, 0x40, 0x40, 0x40, 0x40, 0x40, 0x00, 0xF5, 0xF5, 0xF5, 0xCA, + 0xF1, 0xF2, 0xF3, 0xF5, 0x40, 0x40, 0x40, 0x40 + ).map(_.toByte) + + if (!bytes.sameElements(expected)) { + println(s"Expected bytes: ${expected.map("%02X" format _).mkString(" ")}") + println(s"Actual bytes: ${bytes.map("%02X" format _).mkString(" ")}") + println(s"Actual bytes: ${bytes.map("0x%02X" format _).mkString(", ")}") + + assert(bytes.sameElements(expected), "Written data should match expected EBCDIC encoding") + } + } + } + } + + "constructWriterAst" should { + "fail on duplicate depending on fields" in { + val copybook = + """ 01 RECORD. + | 05 ID PIC 9(2). + | 05 FILLER PIC 9(1). + | 05 CNT1 PIC 9(1). + | 05 NUMBERS PIC 9(2) + | OCCURS 0 TO 5 DEPENDING ON CNT1. + | 05 PLACE. + | 10 COUNTRY-CODE PIC X(2). + | 10 CITY PIC X(10). + | 05 CNT1 PIC 9(1). + | 05 PEOPLE + | OCCURS 0 TO 3 DEPENDING ON CNT1. + | 10 NAME PIC X(14). + | 10 FILLER PIC X(1). + | 10 PHONE-NUMBER PIC X(12). + |""".stripMargin + val exampleJsons = Seq( + """{"ID":1,"NUMBERS":[10,20,30],"PLACE":{"COUNTRY_CODE":"US","CITY":"New York"},"PEOPLE":[{"NAME":"John Doe","PHONE_NUMBER":"555-1234"},{"NAME": "Jane Smith","PHONE_NUMBER":"555-5678"}]}""" + ) + + import spark.implicits._ + + val df = spark.read.json(exampleJsons.toDS()) + .select("ID", "NUMBERS", "PLACE", "PEOPLE") + + val parsedCopybook: Copybook = CopybookParser.parse(copybook) + val ast = parsedCopybook.ast + val children = ast.children.head.asInstanceOf[Group].children + val cnt2 = children(5).asInstanceOf[Primitive].withUpdatedIsDependee(true) + children(5) = cnt2 + + val ex = intercept[IllegalArgumentException] { + NestedRecordCombiner.constructWriterAst(parsedCopybook, df.schema) + } + + assert(ex.getMessage == "Duplicate field name 'CNT1' found in copybook. Field names must be unique (case-insensitive) when OCCURS DEPENDING ON is used. Already found a dependee field with the same name at line 4, current field line number: 10.") + } } + } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/VariableLengthEbcdicWriterSuite.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/VariableLengthEbcdicWriterSuite.scala index 2e8a9c555..9544db3ff 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/VariableLengthEbcdicWriterSuite.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/VariableLengthEbcdicWriterSuite.scala @@ -146,15 +146,18 @@ class VariableLengthEbcdicWriterSuite extends AnyWordSpec with SparkTestBase wit .option("copybook_contents", copybookContents) .option("record_format", "FB") // Not supported .option("variable_size_occurs", "true") // Not supported + .option("occurs_mappings", "{\"DETAIL1\":{\"A\":0,\"B\":1},\"DETAIL2\":{\"A\":1,\"B\":2}}") .option("file_start_offset", "2") // Not supported .option("record_end_offset", "4") // Not supported + .option("segment_field", "A") .save(path.toString) } assert(exception.getMessage.contains("Writer validation issues: Only 'F' and 'V' values for 'record_format' are supported for writing, provided value: 'FB';")) - assert(exception.getMessage.contains("Variable size OCCURS ('variable_size_occurs = true') is not supported for writing")) + assert(exception.getMessage.contains("OCCURS mapping option ('occurs_mappings') is not supported for writing")) assert(exception.getMessage.contains("'record_start_offset' and 'record_end_offset' are not supported for writing")) assert(exception.getMessage.contains("'file_start_offset' and 'file_end_offset' are not supported for writing")) + assert(exception.getMessage.contains("Multi-segment options ('segment_field', 'segment_filter', etc) are not supported for writing")) } } }