Skip to content

Commit a90ce48

Browse files
David Likou
authored andcommitted
ARROW-4213: [Flight] Fix incompatibilities between C++ and Java
I'm willing to update the patches here in case you want to resolve incompatibilities differently. The changes are: - In C++, send/read the schema as the first message in a DoGet stream. - In Java, encode the GetFlightInfo schema in a Flatbuffer Message payload. - In C++, skip null columns when encoding IpcPayload. - In C++, don't write the body tag when encoding IpcPayload if no buffers are present. - In Java, always align buffers when serializing record batches. Additionally: - Add integration tests for Flight. They will fail when trying to test record batches with dictionaries, but otherwise pass. - Generate an uberjar for Flight, and include Apache Commons CLI and gRPC in the uberjar. - Explicitly add the generated gRPC/Protobuf sources to the Java build - for some reason, Maven was not picking them up. - In Java FlightClient, if there's an exception, use it to resolve the VectorSchemaRoot CompletableFuture so that clients do not hang forever. Author: David Li <[email protected]> Author: Wes McKinney <[email protected]> Closes apache#3477 from lihalite/arrow-4213 and squashes the following commits: d5a3ebf <Wes McKinney> Fix comment re: flight integration testing procedure ac337a2 <David Li> Update description of integration test client f867959 <David Li> Fix Flake8 errors 8c7be86 <David Li> clang-format new code 142b97d <David Li> Allow integration test script to control Flight port 3aace90 <David Li> Properly wait for Flight integration test server to start 0d0a476 <David Li> Clean up style issues 4c83485 <David Li> Collect failures at end of integration tests 0f3cce4 <David Li> Always align buffers when writing record batches in Flight 3c057df <David Li> Implement Java server side of Flight integration tests 13c0700 <David Li> Set exception on root in FlightClient 4801257 <David Li> Assume schema in Flight GetInfo is encoded in a Message payload 321f898 <David Li> Include gRPC in Flight uberjar d23a201 <David Li> Explicitly compile generated Protobuf sources for Flight 6885d34 <David Li> Don't write body tag when serializing IpcPayload if no buffers 92c4c13 <David Li> Skeleton of integration test server for Flight 9bf8525 <David Li> Fix segfault in Flight when sending nullary columns aca752b <David Li> Read schemas from stream in Flight DoGet 2acc8be <David Li> Implement arrow::ipc::ReadSchema that works with a Message
1 parent 8db2cdf commit a90ce48

File tree

7 files changed

+361
-9
lines changed

7 files changed

+361
-9
lines changed

flight/pom.xml

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,19 +48,16 @@
4848
<groupId>io.grpc</groupId>
4949
<artifactId>grpc-netty</artifactId>
5050
<version>${dep.grpc.version}</version>
51-
<scope>provided</scope>
5251
</dependency>
5352
<dependency>
5453
<groupId>io.grpc</groupId>
5554
<artifactId>grpc-core</artifactId>
5655
<version>${dep.grpc.version}</version>
57-
<scope>provided</scope>
5856
</dependency>
5957
<dependency>
6058
<groupId>io.grpc</groupId>
6159
<artifactId>grpc-protobuf</artifactId>
6260
<version>${dep.grpc.version}</version>
63-
<scope>provided</scope>
6461
</dependency>
6562
<dependency>
6663
<groupId>io.netty</groupId>
@@ -75,11 +72,15 @@
7572
<groupId>com.google.guava</groupId>
7673
<artifactId>guava</artifactId>
7774
</dependency>
75+
<dependency>
76+
<groupId>commons-cli</groupId>
77+
<artifactId>commons-cli</artifactId>
78+
<version>1.4</version>
79+
</dependency>
7880
<dependency>
7981
<groupId>io.grpc</groupId>
8082
<artifactId>grpc-stub</artifactId>
8183
<version>${dep.grpc.version}</version>
82-
<scope>provided</scope>
8384
</dependency>
8485
<dependency>
8586
<groupId>com.google.protobuf</groupId>
@@ -225,6 +226,43 @@
225226
</execution>
226227
</executions>
227228
</plugin>
229+
<plugin> <!-- add generated sources to classpath -->
230+
<groupId>org.codehaus.mojo</groupId>
231+
<artifactId>build-helper-maven-plugin</artifactId>
232+
<version>1.9.1</version>
233+
<executions>
234+
<execution>
235+
<id>add-generated-sources-to-classpath</id>
236+
<phase>generate-sources</phase>
237+
<goals>
238+
<goal>add-source</goal>
239+
</goals>
240+
<configuration>
241+
<sources>
242+
<source>${project.build.directory}/generated-sources/protobuf</source>
243+
</sources>
244+
</configuration>
245+
</execution>
246+
</executions>
247+
</plugin>
248+
<plugin>
249+
<artifactId>maven-assembly-plugin</artifactId>
250+
<version>3.0.0</version>
251+
<configuration>
252+
<descriptorRefs>
253+
<descriptorRef>jar-with-dependencies</descriptorRef>
254+
</descriptorRefs>
255+
</configuration>
256+
<executions>
257+
<execution>
258+
<id>make-assembly</id>
259+
<phase>package</phase>
260+
<goals>
261+
<goal>single</goal>
262+
</goals>
263+
</execution>
264+
</executions>
265+
</plugin>
228266
</plugins>
229267
</build>
230268
</project>

flight/src/main/java/org/apache/arrow/flight/ArrowMessage.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.io.InputStream;
2323
import java.io.OutputStream;
2424
import java.nio.ByteBuffer;
25+
import java.util.ArrayList;
26+
import java.util.Arrays;
2527
import java.util.List;
2628

2729
import org.apache.arrow.flatbuf.Message;
@@ -52,10 +54,12 @@
5254
import io.grpc.MethodDescriptor.Marshaller;
5355
import io.grpc.internal.ReadableBuffer;
5456
import io.grpc.protobuf.ProtoUtils;
57+
5558
import io.netty.buffer.ArrowBuf;
5659
import io.netty.buffer.ByteBuf;
5760
import io.netty.buffer.ByteBufInputStream;
5861
import io.netty.buffer.CompositeByteBuf;
62+
import io.netty.buffer.Unpooled;
5963

6064
/**
6165
* The in-memory representation of FlightData used to manage a stream of Arrow messages.
@@ -95,6 +99,18 @@ public static HeaderType getHeader(byte b) {
9599

96100
}
97101

102+
// Pre-allocated buffers for padding serialized ArrowMessages.
103+
private static List<ByteBuf> PADDING_BUFFERS = Arrays.asList(
104+
null,
105+
Unpooled.copiedBuffer(new byte[] { 0 }),
106+
Unpooled.copiedBuffer(new byte[] { 0, 0 }),
107+
Unpooled.copiedBuffer(new byte[] { 0, 0, 0 }),
108+
Unpooled.copiedBuffer(new byte[] { 0, 0, 0, 0 }),
109+
Unpooled.copiedBuffer(new byte[] { 0, 0, 0, 0, 0 }),
110+
Unpooled.copiedBuffer(new byte[] { 0, 0, 0, 0, 0, 0 }),
111+
Unpooled.copiedBuffer(new byte[] { 0, 0, 0, 0, 0, 0, 0 })
112+
);
113+
98114
private final FlightDescriptor descriptor;
99115
private final Message message;
100116
private final List<ArrowBuf> bufs;
@@ -253,8 +269,17 @@ private InputStream asInputStream(BufferAllocator allocator) {
253269
cos.writeTag(FlightData.DATA_BODY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
254270

255271
int size = 0;
272+
List<ByteBuf> allBufs = new ArrayList<>();
256273
for (ArrowBuf b : bufs) {
274+
allBufs.add(b);
257275
size += b.readableBytes();
276+
// [ARROW-4213] These buffers must be aligned to an 8-byte boundary in order to be readable from C++.
277+
if (b.readableBytes() % 8 != 0) {
278+
int paddingBytes = 8 - (b.readableBytes() % 8);
279+
assert paddingBytes > 0 && paddingBytes < 8;
280+
size += paddingBytes;
281+
allBufs.add(PADDING_BUFFERS.get(paddingBytes).retain());
282+
}
258283
}
259284
// rawvarint is used for length definition.
260285
cos.writeUInt32NoTag(size);
@@ -263,7 +288,7 @@ private InputStream asInputStream(BufferAllocator allocator) {
263288
ArrowBuf initialBuf = allocator.buffer(baos.size());
264289
initialBuf.writeBytes(baos.toByteArray());
265290
final CompositeByteBuf bb = new CompositeByteBuf(allocator.getAsByteBufAllocator(), true, bufs.size() + 1,
266-
ImmutableList.<ByteBuf>builder().add(initialBuf).addAll(bufs).build());
291+
ImmutableList.<ByteBuf>builder().add(initialBuf).addAll(allBufs).build());
267292
final ByteBufInputStream is = new DrainableByteBufInputStream(bb);
268293
return is;
269294
} catch (Exception ex) {

flight/src/main/java/org/apache/arrow/flight/FlightInfo.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,22 @@
1717

1818
package org.apache.arrow.flight;
1919

20+
import java.io.ByteArrayOutputStream;
21+
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
import java.nio.channels.Channels;
2024
import java.util.List;
2125
import java.util.stream.Collectors;
2226

2327
import org.apache.arrow.flight.impl.Flight;
2428
import org.apache.arrow.flight.impl.Flight.FlightGetInfo;
29+
import org.apache.arrow.vector.ipc.ReadChannel;
30+
import org.apache.arrow.vector.ipc.WriteChannel;
31+
import org.apache.arrow.vector.ipc.message.MessageSerializer;
2532
import org.apache.arrow.vector.types.pojo.Schema;
2633

34+
import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
35+
2736
import com.google.common.collect.ImmutableList;
2837
import com.google.protobuf.ByteString;
2938

@@ -45,8 +54,15 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoin
4554
}
4655

4756
FlightInfo(FlightGetInfo flightGetInfo) {
48-
schema = flightGetInfo.getSchema().size() > 0 ?
49-
Schema.deserialize(flightGetInfo.getSchema().asReadOnlyByteBuffer()) : new Schema(ImmutableList.of());
57+
try {
58+
final ByteBuffer schemaBuf = flightGetInfo.getSchema().asReadOnlyByteBuffer();
59+
schema = flightGetInfo.getSchema().size() > 0 ?
60+
MessageSerializer.deserializeSchema(
61+
new ReadChannel(Channels.newChannel(new ByteBufferBackedInputStream(schemaBuf))))
62+
: new Schema(ImmutableList.of());
63+
} catch (IOException e) {
64+
throw new RuntimeException(e);
65+
}
5066
descriptor = new FlightDescriptor(flightGetInfo.getFlightDescriptor());
5167
endpoints = flightGetInfo.getEndpointList().stream().map(t -> new FlightEndpoint(t)).collect(Collectors.toList());
5268
bytes = flightGetInfo.getTotalBytes();
@@ -74,9 +90,16 @@ public List<FlightEndpoint> getEndpoints() {
7490
}
7591

7692
FlightGetInfo toProtocol() {
93+
// Encode schema in a Message payload
94+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
95+
try {
96+
MessageSerializer.serialize(new WriteChannel(Channels.newChannel(baos)), schema);
97+
} catch (IOException e) {
98+
throw new RuntimeException(e);
99+
}
77100
return Flight.FlightGetInfo.newBuilder()
78101
.addAllEndpoint(endpoints.stream().map(t -> t.toProtocol()).collect(Collectors.toList()))
79-
.setSchema(ByteString.copyFrom(schema.toByteArray()))
102+
.setSchema(ByteString.copyFrom(baos.toByteArray()))
80103
.setFlightDescriptor(descriptor.toProtocol())
81104
.setTotalBytes(FlightInfo.this.bytes)
82105
.setTotalRecords(records)

flight/src/main/java/org/apache/arrow/flight/FlightService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ public boolean isCancelled() {
128128
@Override
129129
public void start(VectorSchemaRoot root) {
130130
responseObserver.onNext(new ArrowMessage(null, root.getSchema()));
131-
unloader = new VectorUnloader(root, true, false);
131+
// [ARROW-4213] We must align buffers to be compatible with other languages.
132+
unloader = new VectorUnloader(root, true, true);
132133
}
133134

134135
@Override

flight/src/main/java/org/apache/arrow/flight/FlightStream.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ public void onNext(ArrowMessage msg) {
178178
public void onError(Throwable t) {
179179
ex = t;
180180
queue.add(DONE_EX);
181+
root.setException(t);
181182
}
182183

183184
@Override
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.flight.example.integration;
19+
20+
import java.io.File;
21+
import java.io.FileOutputStream;
22+
import java.io.IOException;
23+
import java.util.List;
24+
25+
import org.apache.arrow.flight.FlightClient;
26+
import org.apache.arrow.flight.FlightDescriptor;
27+
import org.apache.arrow.flight.FlightEndpoint;
28+
import org.apache.arrow.flight.FlightInfo;
29+
import org.apache.arrow.flight.FlightStream;
30+
import org.apache.arrow.flight.Location;
31+
import org.apache.arrow.memory.BufferAllocator;
32+
import org.apache.arrow.memory.RootAllocator;
33+
import org.apache.arrow.vector.VectorSchemaRoot;
34+
import org.apache.arrow.vector.dictionary.DictionaryProvider;
35+
import org.apache.arrow.vector.ipc.ArrowFileWriter;
36+
import org.apache.commons.cli.CommandLine;
37+
import org.apache.commons.cli.CommandLineParser;
38+
import org.apache.commons.cli.DefaultParser;
39+
import org.apache.commons.cli.Options;
40+
import org.apache.commons.cli.ParseException;
41+
42+
/**
43+
* An Example Flight Server that provides access to the InMemoryStore.
44+
*/
45+
class IntegrationTestClient {
46+
private static final org.slf4j.Logger LOGGER = org.slf4j.LoggerFactory.getLogger(IntegrationTestClient.class);
47+
private final Options options;
48+
49+
private IntegrationTestClient() {
50+
options = new Options();
51+
options.addOption("a", "arrow", true, "arrow file");
52+
options.addOption("j", "json", true, "json file");
53+
options.addOption("host", true, "The host to connect to.");
54+
options.addOption("port", true, "The port to connect to." );
55+
}
56+
57+
public static void main(String[] args) {
58+
try {
59+
new IntegrationTestClient().run(args);
60+
} catch (ParseException e) {
61+
fatalError("Invalid parameters", e);
62+
} catch (IOException e) {
63+
fatalError("Error accessing files", e);
64+
}
65+
}
66+
67+
static void fatalError(String message, Throwable e) {
68+
System.err.println(message);
69+
System.err.println(e.getMessage());
70+
LOGGER.error(message, e);
71+
System.exit(1);
72+
}
73+
74+
private void run(String[] args) throws ParseException, IOException {
75+
CommandLineParser parser = new DefaultParser();
76+
CommandLine cmd = parser.parse(options, args, false);
77+
78+
String fileName = cmd.getOptionValue("arrow");
79+
if (fileName == null) {
80+
throw new IllegalArgumentException("missing arrow file parameter");
81+
}
82+
File arrowFile = new File(fileName);
83+
if (arrowFile.exists()) {
84+
throw new IllegalArgumentException("arrow file already exists: " + arrowFile.getAbsolutePath());
85+
}
86+
87+
final String host = cmd.getOptionValue("host", "localhost");
88+
final int port = Integer.parseInt(cmd.getOptionValue("port", "31337"));
89+
90+
final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
91+
FlightClient client = new FlightClient(allocator, new Location(host, port));
92+
FlightInfo info = client.getInfo(FlightDescriptor.path(cmd.getOptionValue("json")));
93+
List<FlightEndpoint> endpoints = info.getEndpoints();
94+
if (endpoints.isEmpty()) {
95+
throw new RuntimeException("No endpoints returned from Flight server.");
96+
}
97+
98+
FlightStream stream = client.getStream(info.getEndpoints().get(0).getTicket());
99+
try (VectorSchemaRoot root = stream.getRoot();
100+
FileOutputStream fileOutputStream = new FileOutputStream(arrowFile);
101+
ArrowFileWriter arrowWriter = new ArrowFileWriter(root, new DictionaryProvider.MapDictionaryProvider(),
102+
fileOutputStream.getChannel())) {
103+
while (stream.next()) {
104+
arrowWriter.writeBatch();
105+
}
106+
}
107+
}
108+
}

0 commit comments

Comments
 (0)