|
5 | 5 |
|
6 | 6 | """Docling Haystack converter module.""" |
7 | 7 |
|
| 8 | +import tempfile |
8 | 9 | from abc import ABC, abstractmethod |
9 | 10 | from enum import Enum |
10 | 11 | from pathlib import Path |
|
13 | 14 | from docling.chunking import BaseChunk, BaseChunker, HybridChunker |
14 | 15 | from docling.datamodel.document import DoclingDocument |
15 | 16 | from docling.document_converter import DocumentConverter |
16 | | -from haystack import Document, component |
| 17 | +from haystack import Document, component, default_from_dict, default_to_dict, logging |
| 18 | +from haystack.dataclasses.byte_stream import ByteStream |
| 19 | + |
| 20 | +logger = logging.getLogger(__name__) |
17 | 21 |
|
18 | 22 |
|
19 | 23 | class ExportType(str, Enum): |
@@ -100,42 +104,109 @@ def __init__( |
100 | 104 | ) |
101 | 105 | self._meta_extractor = meta_extractor or MetaExtractor() |
102 | 106 |
|
| 107 | + def _handle_bytestream(self, bytestream: ByteStream) -> tuple[str, bool]: |
| 108 | + """Save ByteStream to a temporary file if needed.""" |
| 109 | + suffix = ( |
| 110 | + f".{bytestream.meta.get('file_extension', '')}" |
| 111 | + if bytestream.meta.get("file_extension") |
| 112 | + else None |
| 113 | + ) |
| 114 | + temp_file = tempfile.NamedTemporaryFile(suffix=suffix, delete=False) |
| 115 | + temp_file.write(bytestream.data) |
| 116 | + temp_file.close() |
| 117 | + return temp_file.name, True |
| 118 | + |
103 | 119 | @component.output_types(documents=list[Document]) |
104 | 120 | def run( |
105 | 121 | self, |
106 | | - paths: Iterable[Union[Path, str]], |
| 122 | + paths: Iterable[Union[Path, str, ByteStream]], |
107 | 123 | ): |
108 | 124 | """Run the DoclingConverter. |
109 | 125 |
|
110 | 126 | Args: |
111 | | - paths: The input document locations, either as local paths or URLs. |
| 127 | + paths: The input document locations, either as local paths, URLs, or ByteStream objects. |
112 | 128 |
|
113 | 129 | Returns: |
114 | 130 | list[Document]: The output Haystack Documents. |
115 | 131 | """ |
116 | 132 | documents: list[Document] = [] |
117 | | - for filepath in paths: |
118 | | - dl_doc = self._converter.convert( |
119 | | - source=filepath, |
120 | | - **self._convert_kwargs, |
121 | | - ).document |
122 | | - |
123 | | - if self._export_type == ExportType.DOC_CHUNKS: |
124 | | - chunk_iter = self._chunker.chunk(dl_doc=dl_doc) |
125 | | - hs_docs = [ |
126 | | - Document( |
127 | | - content=self._chunker.serialize(chunk=chunk), |
128 | | - meta=self._meta_extractor.extract_chunk_meta(chunk=chunk), |
| 133 | + temp_files = [] # Track temporary files to clean up later |
| 134 | + |
| 135 | + try: |
| 136 | + for source in paths: |
| 137 | + try: |
| 138 | + if isinstance(source, ByteStream): |
| 139 | + filepath, is_temp = self._handle_bytestream(source) |
| 140 | + if is_temp: |
| 141 | + temp_files.append(filepath) |
| 142 | + else: |
| 143 | + filepath = str(source) |
| 144 | + |
| 145 | + dl_doc = self._converter.convert( |
| 146 | + source=filepath, |
| 147 | + **self._convert_kwargs, |
| 148 | + ).document |
| 149 | + |
| 150 | + if self._export_type == ExportType.DOC_CHUNKS: |
| 151 | + chunk_iter = self._chunker.chunk(dl_doc=dl_doc) |
| 152 | + hs_docs = [ |
| 153 | + Document( |
| 154 | + content=self._chunker.serialize(chunk=chunk), |
| 155 | + meta=self._meta_extractor.extract_chunk_meta( |
| 156 | + chunk=chunk |
| 157 | + ), |
| 158 | + ) |
| 159 | + for chunk in chunk_iter |
| 160 | + ] |
| 161 | + documents.extend(hs_docs) |
| 162 | + elif self._export_type == ExportType.MARKDOWN: |
| 163 | + hs_doc = Document( |
| 164 | + content=dl_doc.export_to_markdown(**self._md_export_kwargs), |
| 165 | + meta=self._meta_extractor.extract_dl_doc_meta( |
| 166 | + dl_doc=dl_doc |
| 167 | + ), |
| 168 | + ) |
| 169 | + documents.append(hs_doc) |
| 170 | + else: |
| 171 | + raise RuntimeError( |
| 172 | + f"Unexpected export type: {self._export_type}" |
| 173 | + ) |
| 174 | + except Exception as e: |
| 175 | + logger.warning( |
| 176 | + "Could not process {source}. Skipping it. Error: {error}", |
| 177 | + source=source, |
| 178 | + error=e, |
129 | 179 | ) |
130 | | - for chunk in chunk_iter |
131 | | - ] |
132 | | - documents.extend(hs_docs) |
133 | | - elif self._export_type == ExportType.MARKDOWN: |
134 | | - hs_doc = Document( |
135 | | - content=dl_doc.export_to_markdown(**self._md_export_kwargs), |
136 | | - meta=self._meta_extractor.extract_dl_doc_meta(dl_doc=dl_doc), |
137 | | - ) |
138 | | - documents.append(hs_doc) |
139 | | - else: |
140 | | - raise RuntimeError(f"Unexpected export type: {self._export_type}") |
141 | | - return {"documents": documents} |
| 180 | + return {"documents": documents} |
| 181 | + finally: # cleanup |
| 182 | + for temp_file in temp_files: |
| 183 | + try: |
| 184 | + Path(temp_file).unlink() |
| 185 | + except Exception as e: |
| 186 | + logger.debug(f"Failed to delete temporary file {temp_file}: {e}") |
| 187 | + |
| 188 | + def to_dict(self) -> dict[str, Any]: |
| 189 | + """ |
| 190 | + Serialize the component to a dictionary for pipeline persistence. |
| 191 | +
|
| 192 | + Returns: |
| 193 | + dict[str, Any]: A dictionary representation of the component |
| 194 | + """ |
| 195 | + return default_to_dict( |
| 196 | + self, |
| 197 | + convert_kwargs=self._convert_kwargs, |
| 198 | + md_export_kwargs=self._md_export_kwargs, |
| 199 | + ) |
| 200 | + |
| 201 | + @classmethod |
| 202 | + def from_dict(cls, data: dict[str, Any]) -> "DoclingConverter": |
| 203 | + """ |
| 204 | + Deserialize the component from a dictionary. |
| 205 | +
|
| 206 | + Args: |
| 207 | + data: Dictionary representation of the component |
| 208 | +
|
| 209 | + Returns: |
| 210 | + DoclingConverter: A new instance of the component |
| 211 | + """ |
| 212 | + return default_from_dict(cls, data) |
0 commit comments