Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/jacoco_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ jobs:
uses: actions/github-script@v6.4.1
with:
script: |
core.setFailed('Changed files coverage is less than ${{ matrix.changedCobolParserOverride }}% for 'cobol-parser'!')
core.setFailed("Changed files coverage is less than ${{ matrix.changedCobolParserOverride }}% for 'cobol-parser'!")
- name: Fail PR if changed files coverage for 'spark-cobol' is less than ${{ matrix.changed }}%
if: ${{ steps.jacoco.outputs.coverage-changed-files < matrix.changed }}
uses: actions/github-script@v6.4.1
with:
script: |
core.setFailed('Changed files coverage is less than ${{ matrix.changed }}% for 'spark-cobol!')
core.setFailed("Changed files coverage is less than ${{ matrix.changed }}% for 'spark-cobol'!")
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Copybook(val ast: CopybookAST) extends Logging with Serializable {
def getFieldValueByName(fieldName: String, recordBytes: Array[Byte], startOffset: Int = 0): Any = {
val primitive = getPrimitiveFieldByName(fieldName)

extractPrimitiveField(primitive, recordBytes, startOffset)
getPrimitiveField(primitive, recordBytes, startOffset)
}

/**
Expand Down Expand Up @@ -203,23 +203,6 @@ class Copybook(val ast: CopybookAST) extends Logging with Serializable {
}
}

/**
* Get value of a field of the copybook record by the AST object of the field
*
* Nested field names can contain '.' to identify the exact field.
* If the field name is unique '.' is not required.
*
* @param field The AST object of the field
* @param bytes Binary encoded data of the record
* @param startOffset An offset to the beginning of the field in the data (in bytes).
* @return The value of the field
*
*/
def extractPrimitiveField(field: Primitive, bytes: Array[Byte], startOffset: Int = 0): Any = {
val slicedBytes = bytes.slice(field.binaryProperties.offset + startOffset, field.binaryProperties.offset + startOffset + field.binaryProperties.actualSize)
field.decodeTypeValue(0, slicedBytes)
}

/** This routine is used for testing by generating a layout position information to compare with mainframe output */
def generateRecordLayoutPositions(): String = {
var fieldCounter: Int = 0
Expand Down Expand Up @@ -431,6 +414,28 @@ object Copybook {
new Copybook(schema)
}

/**
* Get value of a field of the copybook record by the AST object of the field
*
* Nested field names can contain '.' to identify the exact field.
* If the field name is unique '.' is not required.
*
* @param field The AST object of the field
* @param bytes Binary encoded data of the record
* @param startOffset An offset to the beginning of the field in the data (in bytes).
* @return The value of the field
*
*/
def getPrimitiveField(field: Primitive, bytes: Array[Byte], startOffset: Int = 0): Any = {
val slicedBytes = bytes.slice(field.binaryProperties.offset + startOffset, field.binaryProperties.offset + startOffset + field.binaryProperties.actualSize)
field.decodeTypeValue(0, slicedBytes)
}

/** Same as getPrimitiveField(). The original method is left for backwards compatibility. */
def extractPrimitiveField(field: Primitive, bytes: Array[Byte], startOffset: Int = 0): Any = {
getPrimitiveField(field, bytes, startOffset)
}

/**
* Set value of a field of the copybook record by the AST object of the field
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ case class Primitive(

/** Returns a string representation of the field */
override def toString: String = {
s"${" " * 2 * level}$camelCased ${camelCase(redefines.getOrElse(""))} $dataType"
s"${" " * 2 * level}$name ${redefines.getOrElse("")} $dataType"
}

/** Returns true if the field is a child segment */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ package za.co.absa.cobrix.cobol.parser.ast

/** Trait for Cobol copybook AST element (a statement). */
trait Statement {
val camelCased: String = {
camelCase(name)
}

/** Returns the level of the AST element */
def level: Int

Expand Down Expand Up @@ -94,15 +90,7 @@ trait Statement {

/** Returns a string representation of the AST element */
override def toString: String = {
s"${" " * 2 * level}$camelCased ${camelCase(redefines.getOrElse(""))}"
}

/** Returns this the name of this fields as a camel cased string */
def camelCase(s: String): String = {
s.replace(".", "")
.split("-")
.map(c => c.toLowerCase.capitalize)
.mkString
s"${" " * 2 * level}$name ${redefines.getOrElse("")}"
}

/** Returns the original AST element with updated binary properties */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package za.co.absa.cobrix.cobol.reader.extractors.raw

import org.slf4j.LoggerFactory
import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.parser.ast.Primitive
import za.co.absa.cobrix.cobol.reader.iterator.RecordLengthExpression
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
Expand Down Expand Up @@ -123,7 +124,7 @@ class FixedWithRecordLengthExprRawRecordExtractor(ctx: RawRecordContext,

final private def getRecordLengthFromField(lengthAST: Primitive, binaryDataStart: Array[Byte]): Int = {
val length = if (isLengthMapEmpty) {
ctx.copybook.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match {
Copybook.getPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match {
case i: Int => i
case l: Long => l.toInt
case s: String => Try{ s.toInt }.getOrElse(throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type, encountered: '$s'."))
Expand All @@ -132,7 +133,7 @@ class FixedWithRecordLengthExprRawRecordExtractor(ctx: RawRecordContext,
case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.")
}
} else {
ctx.copybook.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match {
Copybook.getPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match {
case i: Int => getRecordLengthFromMapping(i.toString)
case l: Long => getRecordLengthFromMapping(l.toString)
case d: BigDecimal => getRecordLengthFromMapping(d.toString())
Expand Down Expand Up @@ -165,7 +166,7 @@ class FixedWithRecordLengthExprRawRecordExtractor(ctx: RawRecordContext,

expr.fields.foreach{
case (name, field) =>
val obj = ctx.copybook.extractPrimitiveField(field, binaryDataStart, readerProperties.startOffset)
val obj = Copybook.getPrimitiveField(field, binaryDataStart, readerProperties.startOffset)
try {
obj match {
case i: Int => evaluator.setValue(name, i)
Expand Down Expand Up @@ -194,7 +195,7 @@ class FixedWithRecordLengthExprRawRecordExtractor(ctx: RawRecordContext,

private def getSegmentId(data: Array[Byte]): Option[String] = {
segmentIdField.map(field => {
val fieldValue = ctx.copybook.extractPrimitiveField(field, data, readerProperties.startOffset)
val fieldValue = Copybook.getPrimitiveField(field, data, readerProperties.startOffset)
if (fieldValue == null) {
log.error(s"An unexpected null encountered for segment id at $byteIndex")
""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ object IndexGenerator extends Logging {
} else {
if (isValid) {
if (isReallyHierarchical && rootRecordId.isEmpty) {
val curSegmentId = getSegmentId(copybook.get, segmentField.get, record)
val curSegmentId = getSegmentId(segmentField.get, record)
if ((curSegmentId.nonEmpty && rootSegmentIds.isEmpty)
|| (rootSegmentIds.nonEmpty && rootSegmentIds.contains(curSegmentId))) {
rootRecordId = curSegmentId
}
}
if (canSplit && needSplit(recordsInChunk, bytesInChunk)) {
if (!isReallyHierarchical || isSegmentGoodForSplit(rootSegmentIds, copybook.get, segmentField.get, record)) {
if (!isReallyHierarchical || isSegmentGoodForSplit(rootSegmentIds, segmentField.get, record)) {
val indexEntry = SparseIndexEntry(byteIndex, -1, fileId, recordIndex)
val len = index.length
// Do not add an entry if we are still at the same position as the previous entry.
Expand Down Expand Up @@ -157,15 +157,14 @@ object IndexGenerator extends Logging {
}

private def isSegmentGoodForSplit(rootSegmentIds: List[String],
copybook: Copybook,
segmentField: Primitive,
record: Array[Byte]): Boolean = {
val segmentId = getSegmentId(copybook, segmentField, record)
val segmentId = getSegmentId(segmentField, record)
rootSegmentIds.contains(segmentId)
}

private def getSegmentId(copybook: Copybook, segmentIdField: Primitive, data: Array[Byte]): String = {
val v = copybook.extractPrimitiveField(segmentIdField, data)
private def getSegmentId(segmentIdField: Primitive, data: Array[Byte]): String = {
val v = Copybook.getPrimitiveField(segmentIdField, data)
if (v == null) "" else v.toString.trim
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package za.co.absa.cobrix.cobol.reader.iterator

import za.co.absa.cobrix.cobol.internal.Logging
import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.reader.extractors.record.{RecordExtractors, RecordHandler}
import za.co.absa.cobrix.cobol.reader.parameters.{CorruptFieldsPolicy, ReaderParameters}
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
Expand Down Expand Up @@ -108,7 +109,7 @@ class FixedLenNestedRowIterator[T: ClassTag](

private def getSegmentId(data: Array[Byte], offset: Int): Option[String] = {
segmentIdField.map(field => {
val fieldValue = cobolSchema.copybook.extractPrimitiveField(field, data, offset)
val fieldValue = Copybook.getPrimitiveField(field, data, offset)
if (fieldValue == null) {
logger.error(s"An unexpected null encountered for segment id at $byteIndex")
""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ package za.co.absa.cobrix.cobol.reader.iterator

import za.co.absa.cobrix.cobol.internal.Logging
import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.parser.ast.Primitive
import za.co.absa.cobrix.cobol.parser.headerparsers.RecordHeaderParser
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
import za.co.absa.cobrix.cobol.reader.validator.ReaderParametersValidator

Expand Down Expand Up @@ -143,7 +142,7 @@ class VRLRecordReader(cobolSchema: Copybook,

private def getSegmentId(data: Array[Byte]): Option[String] = {
segmentIdField.map(field => {
val fieldValue = cobolSchema.extractPrimitiveField(field, data, readerProperties.startOffset)
val fieldValue = Copybook.getPrimitiveField(field, data, readerProperties.startOffset)
if (fieldValue == null) {
logger.error(s"An unexpected null encountered for segment id at $byteIndex")
""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package za.co.absa.cobrix.cobol.parser.extract

import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.cobrix.cobol.parser.CopybookParser
import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, CobolType}
import za.co.absa.cobrix.cobol.parser.ast.{BinaryProperties, Group, Primitive}
import za.co.absa.cobrix.cobol.parser.decoders.DecoderSelector
import za.co.absa.cobrix.cobol.parser.encoding.{EBCDIC, EncoderSelector}
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy
import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser}

class BinaryExtractorSpec extends AnyFunSuite {

Expand Down Expand Up @@ -113,15 +113,15 @@ class BinaryExtractorSpec extends AnyFunSuite {
0x00.toByte, 0x00.toByte, 0x2F.toByte
)

val copybook = CopybookParser.parseTree(copyBookContents)
val copybook: Copybook = CopybookParser.parseTree(copyBookContents)
val startOffset: Int = 0

test("Test extract primitive field") {

// using getFieldByName
val statement = copybook.getFieldByName("ID")
val field: Primitive = statement.asInstanceOf[Primitive]
val result: Any = copybook.extractPrimitiveField(field, bytes, startOffset)
val result: Any = Copybook.getPrimitiveField(field, bytes, startOffset)
assert(result.asInstanceOf[Int] === 6)

// traverse AST and extract all primitives to map
Expand All @@ -130,7 +130,7 @@ class BinaryExtractorSpec extends AnyFunSuite {
def traverseAst(group: Group): Unit = {
for (child <- group.children) {
if (child.isInstanceOf[Primitive]) {
extractedData += (child.name -> copybook.extractPrimitiveField(child.asInstanceOf[Primitive],
extractedData += (child.name -> Copybook.extractPrimitiveField(child.asInstanceOf[Primitive],
bytes, startOffset))
} else {
assert(child.isInstanceOf[Group] === true)
Expand Down Expand Up @@ -162,7 +162,7 @@ class BinaryExtractorSpec extends AnyFunSuite {

val primitive: Primitive = Primitive(level, name, name, lineNumber, dataType, redefines, isRedefined,
occurs, to, dependingOn, Map(), isDependee, isFiller, DecoderSelector.getDecoder(dataType), EncoderSelector.getEncoder(dataType), binaryProperties)(None)
val result2: Any = copybook.extractPrimitiveField(primitive, bytes, startOffset)
val result2: Any = Copybook.extractPrimitiveField(primitive, bytes, startOffset)
assert(result2.asInstanceOf[String] === "EXAMPLE4")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.cobrix.spark.cobol.source.parameters

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.slf4j.LoggerFactory
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat
import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser._
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, ReaderParameters}
Expand All @@ -32,7 +32,18 @@ import scala.collection.mutable.ListBuffer
* This class provides methods for checking the Spark job options after parsed.
*/
object CobolParametersValidator {

private val log = LoggerFactory.getLogger(this.getClass)

/**
* Validates the provided COBOL processing parameters and throws an exception
* if any inconsistency or missing configuration is detected.
*
* This method checks only if the copybook is defined and not ambiguous.
*
* @param params the COBOL processing parameters to be validated
* @return this method does not return any value. It throws an IllegalArgumentException if
* the validation fails.
*/
def checkSanity(params: CobolParameters): Unit = {
if (params.sourcePaths.isEmpty) {
throw new IllegalArgumentException("Data source path must be specified.")
Expand All @@ -43,12 +54,19 @@ object CobolParametersValidator {
}
}

def validateOrThrow(sparkConf: SparkConf, hadoopConf: Configuration): Unit = {
val parameters = Map[String, String](PARAM_COPYBOOK_PATH -> sparkConf.get(PARAM_COPYBOOK_PATH), PARAM_SOURCE_PATH -> sparkConf.get
(PARAM_SOURCE_PATH))
validateOrThrow(parameters, hadoopConf)
}

/**
* Validates the provided parameters for processing COBOL data and throws an exception
* if any inconsistency or issue is detected in the configurations.
*
* The method checks for the presence and validity of parameters required for data input
* and ensures that conflicting or missing configurations are handled correctly. It also
* verifies the validity and accessibility of copybook files when specified.
*
* @param parameters a map of configuration options to be validated.
* @param hadoopConf the Hadoop configuration object used to support filesystem operations
* when validating copybook paths in distributed storage systems.
* @return this method does not return any value. It throws an exception if any validation fails.
*/
def validateOrThrow(parameters: Map[String, String], hadoopConf: Configuration): Unit = {
val copyBookContents = parameters.get(PARAM_COPYBOOK_CONTENTS)
val copyBookPathFileName = parameters.get(PARAM_COPYBOOK_PATH)
Expand Down Expand Up @@ -116,6 +134,14 @@ object CobolParametersValidator {
}
}

/**
* Validates the parameters provided for writing operations in COBOL data processing.
* Verifies that the specified configurations are compatible with the writer and throws
* an exception in case of violations.
*
* @param readerParameters the configuration containing options to be validated for writing operations.
* @return this method does not return any value but throws an IllegalArgumentException if the validation fails.
*/
def validateParametersForWriting(readerParameters: ReaderParameters): Unit = {
val issues = new ListBuffer[String]

Expand All @@ -124,8 +150,13 @@ object CobolParametersValidator {
s"provided value: '${readerParameters.recordFormat}'"
}

if (readerParameters.variableSizeOccurs) {
issues += "Variable size OCCURS ('variable_size_occurs = true') is not supported for writing"
if (readerParameters.variableSizeOccurs &&
readerParameters.recordFormat == RecordFormat.FixedLength) {
log.warn("Option 'variable_size_occurs=true' is used with 'record_format=F' which means records can have variable length. It is highly recommended to use 'record_format=V' instead.")
}
Comment on lines +153 to +156
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject variable_size_occurs for fixed-length writes.

This combination can emit variable-sized records while spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala:45-76 only adds RDW framing for RecordFormat.VariableLength. With record_format=F, downstream consumers lose unambiguous record boundaries, so this should fail validation instead of warning.

🛑 Suggested validation change
-    if (readerParameters.variableSizeOccurs &&
-      readerParameters.recordFormat == RecordFormat.FixedLength) {
-      log.warn("Option 'variable_size_occurs=true' is used with 'record_format=F' which means records can have variable length. It is highly recommended to use 'record_format=V' instead.")
-    }
+    if (readerParameters.variableSizeOccurs &&
+      readerParameters.recordFormat == RecordFormat.FixedLength) {
+      issues += "Option 'variable_size_occurs=true' requires 'record_format=V' for writing"
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala`
around lines 153 - 156, The validator currently only logs a warning when
readerParameters.variableSizeOccurs is true with readerParameters.recordFormat
== RecordFormat.FixedLength; change this to reject the configuration by
throwing/returning a validation error in CobolParametersValidator (instead of
log.warn) when variableSizeOccurs is set with RecordFormat.FixedLength,
referencing the same flags (variableSizeOccurs and recordFormat) and
RecordFormat.FixedLength to locate the check; include a clear error message
stating that variable-size occurs cannot be used with fixed-length writes
because RDW framing is only added for RecordFormat.VariableLength (see
NestedRecordCombiner handling), so validation must fail.


if (readerParameters.occursMappings.nonEmpty) {
issues += "OCCURS mapping option ('occurs_mappings') is not supported for writing"
}

if (readerParameters.startOffset != 0 || readerParameters.endOffset != 0) {
Expand All @@ -136,6 +167,10 @@ object CobolParametersValidator {
issues += "'file_start_offset' and 'file_end_offset' are not supported for writing"
}

if (readerParameters.multisegment.isDefined) {
issues += "Multi-segment options ('segment_field', 'segment_filter', etc) are not supported for writing"
}

if (issues.nonEmpty) {
throw new IllegalArgumentException(s"Writer validation issues: ${issues.mkString("; ")}")
}
Expand Down
Loading
Loading