@@ -312,7 +315,7 @@
mysql-8.0
- container-registry.oracle.com/mysql/community-server:8.0
+ mirror.gcr.io/mysql:8.0
@@ -321,6 +324,12 @@
mirror.gcr.io/library/mariadb:10.6
+
+ mysql-8.4
+
+ mirror.gcr.io/mysql:8.4
+
+
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java b/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java
index b9c491a4..d722fdac 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java
@@ -21,11 +21,11 @@
/**
* GTID set as described in GTID
- * Concepts of MySQL 5.6 Reference Manual.
+ * Concepts of MySQL 5.6 Reference Manual, with support for MySQL 8.3+ tagged GTID intervals.
*
*
* gtid_set: uuid_set[,uuid_set]...
- * uuid_set: uuid:interval[:interval]...
+ * uuid_set: uuid[:tag]:interval[:interval]...
* uuid: hhhhhhhh-hhhh-hhhh-hhhh-hhhhhhhhhhhh, h: [0-9|A-F]
* interval: n[-n], (n >= 1)
*
@@ -34,7 +34,7 @@
*/
public class GtidSet {
- private final Map map = new LinkedHashMap();
+ private final Map map = new LinkedHashMap();
public static GtidSet parse(String gtidStr) {
if ( MariadbGtidSet.isMariaGtidSet(gtidStr) ) {
@@ -45,30 +45,77 @@ public static GtidSet parse(String gtidStr) {
}
/**
* @param gtidSet gtid set comprised of closed intervals (like MySQL's executed_gtid_set).
+ * Supports legacy UUID intervals (uuid:intervals), MySQL tagged GTID-set intervals
+ * (uuid:tag:intervals), and the previous tag-first representation (tag:uuid:intervals).
*/
public GtidSet(String gtidSet) {
String[] uuidSets = (gtidSet == null || gtidSet.isEmpty()) ? new String[0] :
gtidSet.replace("\n", "").split(",");
for (String uuidSet : uuidSets) {
- int uuidSeparatorIndex = uuidSet.indexOf(":");
- UUID sourceId = UUID.fromString(uuidSet.substring(0, uuidSeparatorIndex));
- List intervals = new ArrayList();
- String[] rawIntervals = uuidSet.substring(uuidSeparatorIndex + 1).split(":");
- for (String interval : rawIntervals) {
- String[] is = interval.split("-");
- long[] split = new long[is.length];
- for (int i = 0, e = is.length; i < e; i++) {
- split[i] = Long.parseLong(is[i]);
- }
- if (split.length == 1) {
- split = new long[] {split[0], split[0]};
+ final String[] parts = uuidSet.split(":");
+
+ // MySQL tagged GTID sets identify intervals by TSID (uuid[:tag]).
+ // Also accept the previous tag-first representation (tag:uuid:intervals).
+ int uuidIndex = 0;
+ int intervalsStartIndex = 1;
+
+ // UUID format: 8-4-4-4-12 hex digits with dashes. If the first token is not a UUID,
+ // treat it as a tag from the previous tag-first representation.
+ if (parts.length >= 3 && !isValidUuidFormat(parts[0])) {
+ // Previous tag-first format: tag:uuid:intervals...
+ uuidIndex = 1;
+ intervalsStartIndex = 2;
+ }
+
+ final UUID sourceId = UUID.fromString(parts[uuidIndex]);
+ String tag = uuidIndex == 1 ? parts[0] : null;
+ for (int i = intervalsStartIndex; i < parts.length; i++) {
+ final String part = parts[i];
+ if (!isInterval(part)) {
+ tag = part;
+ continue;
}
- intervals.add(new Interval(split[0], split[1]));
+ addInterval(sourceId, tag, parseInterval(part));
}
- map.put(sourceId, new UUIDSet(sourceId, intervals));
}
}
+ private void addInterval(UUID sourceId, String tag, Interval interval) {
+ final Tsid tsid = new Tsid(sourceId, tag);
+ final UUIDSet existing = map.get(tsid);
+ final List intervals = existing == null ? new ArrayList() :
+ new ArrayList(existing.getIntervals());
+ intervals.add(interval);
+ map.put(tsid, new UUIDSet(sourceId, tag, intervals));
+ }
+
+ private static Interval parseInterval(final String interval) {
+ final String[] is = interval.split("-");
+ long[] split = new long[is.length];
+ for (int j = 0, e = is.length; j < e; j++) {
+ split[j] = Long.parseLong(is[j]);
+ }
+ if (split.length == 1) {
+ split = new long[] {split[0], split[0]};
+ }
+ return new Interval(split[0], split[1]);
+ }
+
+ private static boolean isInterval(final String str) {
+ return str.matches("[0-9]+(-[0-9]+)?");
+ }
+
+ /**
+ * Checks if a string matches the UUID format (8-4-4-4-12 hex digits).
+ *
+ * @param str the string to check
+ * @return true if the string is a valid UUID format, false otherwise
+ */
+ private static boolean isValidUuidFormat(final String str) {
+ // UUID format: 8-4-4-4-12 hex digits with dashes
+ return str.matches("[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}");
+ }
+
/**
* Get an immutable collection of the {@link UUIDSet range of GTIDs for a single server}.
* @return the {@link UUIDSet GTID ranges for each server}; never null
@@ -78,22 +125,32 @@ public Collection getUUIDSets() {
}
/**
- * Find the {@link UUIDSet} for the server with the specified UUID.
+ * Find the untagged {@link UUIDSet} for the server with the specified UUID.
* @param uuid the UUID of the server
* @return the {@link UUIDSet} for the identified server, or {@code null} if there are no GTIDs from that server.
*/
public UUIDSet getUUIDSet(String uuid) {
- return map.get(UUID.fromString(uuid));
+ return map.get(new Tsid(UUID.fromString(uuid), null));
+ }
+
+ /**
+ * Find the {@link UUIDSet} for the server with the specified UUID and tag.
+ * @param uuid the UUID of the server
+ * @param tag the GTID tag, or {@code null} for untagged GTIDs
+ * @return the {@link UUIDSet} for the identified TSID, or {@code null} if there are no GTIDs for that TSID.
+ */
+ public UUIDSet getUUIDSet(String uuid, String tag) {
+ return map.get(new Tsid(UUID.fromString(uuid), tag));
}
/**
* Add or replace the UUIDSet
* @param uuidSet UUIDSet to be added
- * @return the old {@link UUIDSet} for the server given in uuidSet param,
- * or {@code null} if there are no UUIDSet for the given server.
+ * @return the old {@link UUIDSet} for the TSID given in uuidSet param,
+ * or {@code null} if there are no UUIDSet for the given TSID.
*/
public UUIDSet putUUIDSet(UUIDSet uuidSet) {
- return map.put(uuidSet.getServerId(), uuidSet);
+ return map.put(uuidSet.getTsid(), uuidSet);
}
/**
@@ -115,9 +172,11 @@ public void addGtid(Object gtid) {
}
private boolean add(MySqlGtid mySqlGtid) {
- UUIDSet uuidSet = map.get(mySqlGtid.getServerId());
+ final Tsid tsid = new Tsid(mySqlGtid.getServerId(), mySqlGtid.getTag());
+ UUIDSet uuidSet = map.get(tsid);
if (uuidSet == null) {
- map.put(mySqlGtid.getServerId(), uuidSet = new UUIDSet(mySqlGtid.getServerId(), new ArrayList()));
+ map.put(tsid, uuidSet = new UUIDSet(mySqlGtid.getServerId(), mySqlGtid.getTag(),
+ new ArrayList()));
}
return uuidSet.add(mySqlGtid.getTransactionId());
}
@@ -140,7 +199,7 @@ public boolean isContainedWithin(GtidSet other) {
return true;
}
for (UUIDSet uuidSet : map.values()) {
- UUIDSet thatSet = other.map.get(uuidSet.getServerId());
+ UUIDSet thatSet = other.map.get(uuidSet.getTsid());
if (!uuidSet.isContainedWithin(thatSet)) {
return false;
}
@@ -168,8 +227,18 @@ public boolean equals(Object obj) {
@Override
public String toString() {
List gtids = new ArrayList();
- for (UUIDSet uuidSet : map.values()) {
- gtids.add(uuidSet.getServerId() + ":" + join(uuidSet.intervals, ":"));
+ for (Map.Entry> entry : getUUIDSetGroups().entrySet()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(entry.getKey()).append(':');
+ Iterator iter = entry.getValue().iterator();
+ if (iter.hasNext()) {
+ appendTaggedIntervals(sb, iter.next());
+ }
+ while (iter.hasNext()) {
+ sb.append(':');
+ appendTaggedIntervals(sb, iter.next());
+ }
+ gtids.add(sb.toString());
}
return join(gtids, ",");
}
@@ -189,6 +258,54 @@ private static String join(Collection> o, String delimiter) {
return sb.substring(0, sb.length() - delimiter.length());
}
+ private Map> getUUIDSetGroups() {
+ Map> groups = new LinkedHashMap>();
+ for (UUIDSet uuidSet : map.values()) {
+ List uuidSets = groups.get(uuidSet.getServerId());
+ if (uuidSets == null) {
+ uuidSets = new ArrayList();
+ groups.put(uuidSet.getServerId(), uuidSets);
+ }
+ uuidSets.add(uuidSet);
+ }
+ return groups;
+ }
+
+ private static void appendTaggedIntervals(StringBuilder sb, UUIDSet uuidSet) {
+ if (uuidSet.getTag() != null) {
+ sb.append(uuidSet.getTag()).append(':');
+ }
+ sb.append(join(uuidSet.intervals, ":"));
+ }
+
+ private static final class Tsid {
+
+ private final UUID uuid;
+ private final String tag;
+
+ private Tsid(UUID uuid, String tag) {
+ this.uuid = uuid;
+ this.tag = tag == null || tag.isEmpty() ? null : tag;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * uuid.hashCode() + (tag == null ? 0 : tag.hashCode());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj instanceof Tsid) {
+ Tsid that = (Tsid) obj;
+ return this.uuid.equals(that.uuid) && Objects.equals(this.tag, that.tag);
+ }
+ return false;
+ }
+ }
+
/**
* A range of GTIDs for a single server with a specific UUID.
* @see GtidSet
@@ -196,6 +313,7 @@ private static String join(Collection> o, String delimiter) {
public static final class UUIDSet {
private final UUID uuid;
+ private final String tag;
private final List intervals;
public UUIDSet(String uuid, List intervals) {
@@ -203,10 +321,20 @@ public UUIDSet(String uuid, List intervals) {
}
public UUIDSet(UUID uuid, List intervals) {
+ this(uuid, null, intervals);
+ }
+
+ public UUIDSet(String uuid, String tag, List intervals) {
+ this(UUID.fromString(uuid), tag, intervals);
+ }
+
+ public UUIDSet(UUID uuid, String tag, List intervals) {
this.uuid = uuid;
+ this.tag = tag == null || tag.isEmpty() ? null : tag;
this.intervals = intervals;
if (intervals.size() > 1) {
- joinAdjacentIntervals(0);
+ Collections.sort(intervals);
+ joinAdjacentIntervals();
}
}
@@ -237,13 +365,23 @@ private boolean add(long transactionId) {
}
/**
- * Collapses intervals like a-(b-1):b-c into a-c (only in index+-1 range).
+ * Collapses adjacent or overlapping intervals near the supplied index.
*/
private void joinAdjacentIntervals(int index) {
for (int i = Math.min(index + 1, intervals.size() - 1), e = Math.max(index - 1, 0); i > e; i--) {
Interval a = intervals.get(i - 1), b = intervals.get(i);
- if (a.end + 1 == b.start) {
- a.end = b.end;
+ if (a.end + 1 >= b.start) {
+ a.end = Math.max(a.end, b.end);
+ intervals.remove(i);
+ }
+ }
+ }
+
+ private void joinAdjacentIntervals() {
+ for (int i = intervals.size() - 1; i > 0; i--) {
+ Interval a = intervals.get(i - 1), b = intervals.get(i);
+ if (a.end + 1 >= b.start) {
+ a.end = Math.max(a.end, b.end);
intervals.remove(i);
}
}
@@ -285,6 +423,14 @@ public UUID getServerId() {
return uuid;
}
+ public String getTag() {
+ return tag;
+ }
+
+ private Tsid getTsid() {
+ return new Tsid(uuid, tag);
+ }
+
/**
* Get the intervals of transaction numbers.
@@ -309,6 +455,9 @@ public boolean isContainedWithin(UUIDSet other) {
// not even the same server ...
return false;
}
+ if (!Objects.equals(this.tag, other.tag)) {
+ return false;
+ }
if (this.intervals.isEmpty()) {
return true;
}
@@ -333,7 +482,7 @@ public boolean isContainedWithin(UUIDSet other) {
@Override
public int hashCode() {
- return uuid.hashCode();
+ return 31 * uuid.hashCode() + (tag == null ? 0 : tag.hashCode());
}
@Override
@@ -344,6 +493,7 @@ public boolean equals(Object obj) {
if (obj instanceof UUIDSet) {
UUIDSet that = (UUIDSet) obj;
return this.uuid.equals(that.uuid) &&
+ Objects.equals(this.tag, that.tag) &&
this.getIntervals().equals(that.getIntervals());
}
return super.equals(obj);
@@ -356,6 +506,9 @@ public String toString() {
sb.append(',');
}
sb.append(uuid).append(':');
+ if (tag != null) {
+ sb.append(tag).append(':');
+ }
Iterator iter = intervals.iterator();
if (iter.hasNext()) {
sb.append(iter.next());
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java b/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java
index 0525d1ff..f34243b7 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java
@@ -208,6 +208,17 @@ public enum EventType {
*/
TRANSACTION_PAYLOAD(40),
+ /**
+ * Heartbeat event version 2.
+ */
+ HEARTBEAT_V2(41),
+
+ /**
+ * Global Transaction Identifier with tag support (MySQL 8.3+).
+ * Similar to GTID but includes an optional tag field for organizational purposes.
+ */
+ GTID_TAGGED(42),
+
/**
* MariaDB Support Events
*
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/GtidTaggedEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/GtidTaggedEventData.java
new file mode 100644
index 00000000..6502b1c3
--- /dev/null
+++ b/src/main/java/com/github/shyiko/mysql/binlog/event/GtidTaggedEventData.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2013 Patrick Prasse
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.shyiko.mysql.binlog.event;
+
+/**
+ * GTID_TAGGED_LOG_EVENT (MySQL 8.3+) - Global Transaction Identifier with tag support.
+ * Similar to GTID_LOG_EVENT but includes an optional tag field for organizational purposes.
+ *
+ * @author Patrick Prasse
+ */
+public class GtidTaggedEventData implements EventData {
+
+ public static final byte COMMIT_FLAG = 1;
+
+ private MySqlGtid gtid;
+ private byte flags;
+ private long lastCommitted;
+ private long sequenceNumber;
+ private long immediateCommitTimestamp;
+ private long originalCommitTimestamp;
+ private long transactionLength;
+ private int immediateServerVersion;
+ private int originalServerVersion;
+ private long commitGroupTicket;
+
+ @Deprecated
+ public GtidTaggedEventData() {
+ }
+
+ public GtidTaggedEventData(MySqlGtid gtid, byte flags, long lastCommitted, long sequenceNumber,
+ long immediateCommitTimestamp, long originalCommitTimestamp,
+ long transactionLength, int immediateServerVersion,
+ int originalServerVersion, long commitGroupTicket) {
+ this.gtid = gtid;
+ this.flags = flags;
+ this.lastCommitted = lastCommitted;
+ this.sequenceNumber = sequenceNumber;
+ this.immediateCommitTimestamp = immediateCommitTimestamp;
+ this.originalCommitTimestamp = originalCommitTimestamp;
+ this.transactionLength = transactionLength;
+ this.immediateServerVersion = immediateServerVersion;
+ this.originalServerVersion = originalServerVersion;
+ this.commitGroupTicket = commitGroupTicket;
+ }
+
+ @Deprecated
+ public String getGtid() {
+ return gtid.toString();
+ }
+
+ @Deprecated
+ public void setGtid(String gtid) {
+ this.gtid = MySqlGtid.fromString(gtid);
+ }
+
+ public MySqlGtid getMySqlGtid() {
+ return gtid;
+ }
+
+ public byte getFlags() {
+ return flags;
+ }
+
+ public long getLastCommitted() {
+ return lastCommitted;
+ }
+
+ public long getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public long getImmediateCommitTimestamp() {
+ return immediateCommitTimestamp;
+ }
+
+ public long getOriginalCommitTimestamp() {
+ return originalCommitTimestamp;
+ }
+
+ public long getTransactionLength() {
+ return transactionLength;
+ }
+
+ public int getImmediateServerVersion() {
+ return immediateServerVersion;
+ }
+
+ public int getOriginalServerVersion() {
+ return originalServerVersion;
+ }
+
+ public long getCommitGroupTicket() {
+ return commitGroupTicket;
+ }
+
+ @Deprecated
+ public void setFlags(byte flags) {
+ this.flags = flags;
+ }
+
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("GtidTaggedEventData");
+ sb.append("{flags=").append(flags).append(", gtid='").append(gtid).append('\'');
+ sb.append(", last_committed='").append(lastCommitted).append('\'');
+ sb.append(", sequence_number='").append(sequenceNumber).append('\'');
+ if (immediateCommitTimestamp != 0) {
+ sb.append(", immediate_commit_timestamp='").append(immediateCommitTimestamp).append('\'');
+ sb.append(", original_commit_timestamp='").append(originalCommitTimestamp).append('\'');
+ }
+ if (transactionLength != 0) {
+ sb.append(", transaction_length='").append(transactionLength).append('\'');
+ if (immediateServerVersion != 0) {
+ sb.append(", immediate_server_version='").append(immediateServerVersion).append('\'');
+ sb.append(", original_server_version='").append(originalServerVersion).append('\'');
+ }
+ }
+ if (commitGroupTicket != 0) {
+ sb.append(", commit_group_ticket='").append(commitGroupTicket).append('\'');
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+
+}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/MySqlGtid.java b/src/main/java/com/github/shyiko/mysql/binlog/event/MySqlGtid.java
index c7fc3f2c..bb4929ef 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/event/MySqlGtid.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/event/MySqlGtid.java
@@ -2,25 +2,107 @@
import java.util.UUID;
+/**
+ * Represents a MySQL GTID (Global Transaction Identifier).
+ * Supports both legacy format (uuid:transaction_id) and MySQL 8.3+ tagged format (uuid:tag:transaction_id).
+ */
public class MySqlGtid {
+ public static final int TAG_MAX_LENGTH = 32;
+
+ private final String tag;
private final UUID serverId;
private final long transactionId;
- public MySqlGtid(UUID serverId, long transactionId) {
+ /**
+ * Creates a MySqlGtid without a tag (legacy format).
+ *
+ * @param serverId the server UUID
+ * @param transactionId the transaction ID
+ */
+ public MySqlGtid(final UUID serverId, final long transactionId) {
+ this(null, serverId, transactionId);
+ }
+
+ /**
+ * Creates a MySqlGtid with an optional tag (MySQL 8.3+ format).
+ *
+ * @param tag the optional tag (null or empty for legacy format)
+ * @param serverId the server UUID
+ * @param transactionId the transaction ID
+ */
+ public MySqlGtid(final String tag, final UUID serverId, final long transactionId) {
+ this.tag = tag;
this.serverId = serverId;
this.transactionId = transactionId;
}
- public static MySqlGtid fromString(String gtid) {
- String[] split = gtid.split(":");
- String sourceId = split[0];
- long transactionId = Long.parseLong(split[1]);
- return new MySqlGtid(UUID.fromString(sourceId), transactionId);
+ /**
+ * Parses a GTID string in either legacy or tagged format.
+ *
+ * Supported formats:
+ *
+ * - Legacy: uuid:transaction_id
+ * - Tagged (MySQL 8.3+): uuid:tag:transaction_id
+ *
+ *
+ * @param gtid the GTID string to parse
+ * @return the parsed MySqlGtid
+ * @throws IllegalArgumentException if the format is invalid
+ */
+ public static MySqlGtid fromString(final String gtid) {
+ final String[] split = gtid.split(":");
+
+ if (split.length == 2) {
+ // Legacy format: uuid:transaction_id
+ final String sourceId = split[0];
+ final long transactionId = Long.parseLong(split[1]);
+ return new MySqlGtid(UUID.fromString(sourceId), transactionId);
+ }
+ else if (split.length == 3) {
+ final String sourceId;
+ final String tag;
+ if (isValidUuidFormat(split[0])) {
+ // Tagged format: uuid:tag:transaction_id
+ sourceId = split[0];
+ tag = split[1];
+ }
+ else {
+ // Backward compatibility for the previous tag-first representation.
+ tag = split[0];
+ sourceId = split[1];
+ }
+ if (tag.length() > TAG_MAX_LENGTH) {
+ throw new IllegalArgumentException(
+ "Invalid GTID format: tag length " + tag.length() +
+ " exceeds maximum " + TAG_MAX_LENGTH
+ );
+ }
+ final long transactionId = Long.parseLong(split[2]);
+ return new MySqlGtid(tag, UUID.fromString(sourceId), transactionId);
+ }
+ else {
+ throw new IllegalArgumentException(
+ "Invalid GTID format: " + gtid +
+ ". Expected format: 'uuid:transaction_id' or 'uuid:tag:transaction_id'"
+ );
+ }
}
@Override
public String toString() {
- return serverId.toString()+":"+transactionId;
+ if (tag != null && !tag.isEmpty()) {
+ return serverId.toString() + ":" + tag + ":" + transactionId;
+ }
+ return serverId.toString() + ":" + transactionId;
+ }
+
+ /**
+ * Gets the optional tag (MySQL 8.3+).
+ *
+ * @return the tag, or null if not present
+ */
+ public String getTag() {
+ return tag;
}
public UUID getServerId() {
@@ -30,4 +112,8 @@ public UUID getServerId() {
public long getTransactionId() {
return transactionId;
}
-}
+
+ private static boolean isValidUuidFormat(final String str) {
+ return str.matches("[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}");
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java
index 6ef2616d..1b3b4776 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java
@@ -116,6 +116,8 @@ private void registerDefaultEventDataDeserializers() {
new RowsQueryEventDataDeserializer());
eventDataDeserializers.put(EventType.GTID,
new GtidEventDataDeserializer());
+ eventDataDeserializers.put(EventType.GTID_TAGGED,
+ new GtidTaggedEventDataDeserializer());
eventDataDeserializers.put(EventType.PREVIOUS_GTIDS,
new PreviousGtidSetDeserializer());
eventDataDeserializers.put(EventType.XA_PREPARE,
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/GtidTaggedEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/GtidTaggedEventDataDeserializer.java
new file mode 100644
index 00000000..8505e619
--- /dev/null
+++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/GtidTaggedEventDataDeserializer.java
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2013 Patrick Prasse
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.shyiko.mysql.binlog.event.deserialization;
+
+import com.github.shyiko.mysql.binlog.event.GtidTaggedEventData;
+import com.github.shyiko.mysql.binlog.event.MySqlGtid;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+/**
+ * Deserializer for GTID_TAGGED_LOG_EVENT (MySQL 8.3+).
+ *
+ * @author Patrick Prasse
+ */
+public class GtidTaggedEventDataDeserializer implements EventDataDeserializer {
+ private static final int SERIALIZATION_FORMAT_VERSION = 1;
+
+ @Override
+ public GtidTaggedEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
+ final Cursor cursor = new Cursor(inputStream.read(inputStream.available()));
+ final long serializationFormatVersion = cursor.readUnsignedVarLong();
+ if (serializationFormatVersion != SERIALIZATION_FORMAT_VERSION) {
+ throw new IOException("Unexpected GTID_TAGGED serialization format version " + serializationFormatVersion);
+ }
+ final int eventEnd = cursor.checkedInt(cursor.readUnsignedVarLong(), "GTID_TAGGED event length");
+ cursor.readUnsignedVarLong(); // last non-ignorable field id
+
+ cursor.readFieldId(0);
+ final byte flags = (byte) cursor.readUnsignedVarLong();
+
+ cursor.readFieldId(1);
+ final long sourceIdMostSignificantBits = cursor.readUuidLong();
+ final long sourceIdLeastSignificantBits = cursor.readUuidLong();
+
+ cursor.readFieldId(2);
+ final long transactionId = cursor.readSignedVarLong();
+
+ cursor.readFieldId(3);
+ final String tag = cursor.readString(MySqlGtid.TAG_MAX_LENGTH);
+
+ final MySqlGtid gtid = new MySqlGtid(
+ tag.isEmpty() ? null : tag,
+ new UUID(sourceIdMostSignificantBits, sourceIdLeastSignificantBits),
+ transactionId
+ );
+
+ cursor.readFieldId(4);
+ final long lastCommitted = cursor.readSignedVarLong();
+
+ cursor.readFieldId(5);
+ final long sequenceNumber = cursor.readSignedVarLong();
+
+ cursor.readFieldId(6);
+ final long immediateCommitTimestamp = cursor.readUnsignedVarLong();
+
+ final long originalCommitTimestamp;
+ if (cursor.nextFieldId(eventEnd) == 7) {
+ cursor.readFieldId(7);
+ originalCommitTimestamp = cursor.readUnsignedVarLong();
+ } else {
+ originalCommitTimestamp = immediateCommitTimestamp;
+ }
+
+ cursor.readFieldId(8);
+ final long transactionLength = cursor.readUnsignedVarLong();
+
+ cursor.readFieldId(9);
+ final int immediateServerVersion = cursor.checkedInt(cursor.readUnsignedVarLong(), "immediate server version");
+
+ final int originalServerVersion;
+ if (cursor.nextFieldId(eventEnd) == 10) {
+ cursor.readFieldId(10);
+ originalServerVersion = cursor.checkedInt(cursor.readUnsignedVarLong(), "original server version");
+ } else {
+ originalServerVersion = immediateServerVersion;
+ }
+
+ long commitGroupTicket = 0;
+ if (cursor.nextFieldId(eventEnd) == 11) {
+ cursor.readFieldId(11);
+ commitGroupTicket = cursor.readUnsignedVarLong();
+ }
+
+ return new GtidTaggedEventData(gtid, flags, lastCommitted, sequenceNumber,
+ immediateCommitTimestamp, originalCommitTimestamp, transactionLength,
+ immediateServerVersion, originalServerVersion, commitGroupTicket);
+ }
+
+ private static final class Cursor {
+ private final byte[] bytes;
+ private int position;
+
+ private Cursor(byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ private void readFieldId(int expectedFieldId) throws IOException {
+ final long fieldId = readUnsignedVarLong();
+ if (fieldId != expectedFieldId) {
+ throw new IOException("Expected GTID_TAGGED field " + expectedFieldId + " but found " + fieldId);
+ }
+ }
+
+ private int nextFieldId(int eventEnd) throws IOException {
+ if (position >= Math.min(eventEnd, bytes.length)) {
+ return -1;
+ }
+ final int savedPosition = position;
+ final long fieldId = readUnsignedVarLong();
+ position = savedPosition;
+ return checkedInt(fieldId, "field id");
+ }
+
+ private long readSignedVarLong() throws IOException {
+ final long value = readUnsignedVarLong();
+ return (value & 1) == 0 ? value >>> 1 : -(value >>> 1) - 1;
+ }
+
+ private long readUnsignedVarLong() throws IOException {
+ final int firstByte = readByte();
+ int byteCount = 1;
+ while (byteCount < 9 && ((firstByte >>> (byteCount - 1)) & 1) == 1) {
+ byteCount++;
+ }
+ if (position + byteCount - 1 > bytes.length) {
+ throw new IOException("Unexpected end of GTID_TAGGED variable-length integer");
+ }
+
+ long result = (firstByte & 0xffL) >>> byteCount;
+ if (byteCount == 1) {
+ return result;
+ }
+
+ long remaining = 0;
+ for (int i = 0; i < byteCount - 1; ++i) {
+ remaining |= (long) readByte() << (i << 3);
+ }
+ final int shift = 8 - byteCount + ((byteCount + 7) >> 4);
+ return result | (remaining << shift);
+ }
+
+ private long readUuidLong() throws IOException {
+ long result = 0;
+ for (int i = 0; i < 8; ++i) {
+ final long value = readUnsignedVarLong();
+ if (value > 0xff) {
+ throw new IOException("GTID_TAGGED UUID byte exceeds byte range: " + value);
+ }
+ result = (result << 8) | value;
+ }
+ return result;
+ }
+
+ private String readString(int maxLength) throws IOException {
+ final int length = checkedInt(readUnsignedVarLong(), "tag length");
+ if (length > maxLength) {
+ throw new IOException("GTID tag length " + length + " exceeds maximum " + maxLength);
+ }
+ if (position + length > bytes.length) {
+ throw new IOException("Unexpected end of GTID tag");
+ }
+ final String result = new String(bytes, position, length, StandardCharsets.UTF_8);
+ position += length;
+ return result;
+ }
+
+ private int checkedInt(long value, String name) throws IOException {
+ if (value > Integer.MAX_VALUE) {
+ throw new IOException("GTID_TAGGED " + name + " exceeds integer range: " + value);
+ }
+ return (int) value;
+ }
+
+ private int readByte() throws IOException {
+ if (position >= bytes.length) {
+ throw new IOException("Unexpected end of GTID_TAGGED event");
+ }
+ return bytes[position++] & 0xff;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/PreviousGtidSetDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/PreviousGtidSetDeserializer.java
index ca1fee73..43b176a8 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/PreviousGtidSetDeserializer.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/PreviousGtidSetDeserializer.java
@@ -18,6 +18,8 @@
import static java.lang.String.format;
import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
import com.github.shyiko.mysql.binlog.event.PreviousGtidSetEventData;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
@@ -42,17 +44,14 @@ public PreviousGtidSetEventData deserialize(
nUuidsMask = 0x00ffffffffffff00L;
nUuids = (int) ((nUuidsEncoded & nUuidsMask) >> 8);
}
- String[] gtids = new String[nUuids];
+ Map gtids = new LinkedHashMap();
for (int i = 0; i < nUuids; i++) {
String uuid = formatUUID(inputStream.read(16));
+ String tag = null;
if (formatEncoded==TAGGED_GTID) {
int len = inputStream.readInteger(1);
- String tag = inputStream.readString(len >> 1);
-
- if (!tag.isEmpty()) {
- uuid += ":" + tag;
- }
+ tag = inputStream.readString(len >> 1);
}
int nIntervals = inputStream.readInteger(8);
@@ -63,9 +62,18 @@ public PreviousGtidSetEventData deserialize(
intervals[j] = start + "-" + (end - 1);
}
- gtids[i] = format("%s:%s", uuid, join(intervals, ":"));
+ String uuidSet = gtids.get(uuid);
+ if (uuidSet == null) {
+ uuidSet = uuid;
+ }
+ if (tag != null && !tag.isEmpty()) {
+ uuidSet = format("%s:%s:%s", uuidSet, tag, join(intervals, ":"));
+ } else {
+ uuidSet = format("%s:%s", uuidSet, join(intervals, ":"));
+ }
+ gtids.put(uuid, uuidSet);
}
- return new PreviousGtidSetEventData(join(gtids, ","));
+ return new PreviousGtidSetEventData(join(gtids.values().toArray(new String[0]), ","));
}
private String formatUUID(byte[] bytes) {
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogGtidCommand.java b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogGtidCommand.java
index 6f28eb48..a0efd14d 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogGtidCommand.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogGtidCommand.java
@@ -48,15 +48,23 @@ public byte[] toByteArray() throws IOException {
buffer.writeString(this.binlogFilename);
buffer.writeLong(this.binlogPosition, 8);
Collection uuidSets = gtidSet.getUUIDSets();
+ boolean taggedFormat = containsTaggedGtid(uuidSets);
int dataSize = 8 /* number of uuidSets */;
for (GtidSet.UUIDSet uuidSet : uuidSets) {
- dataSize += 16 /* uuid */ + 8 /* number of intervals */ +
+ dataSize += 16 /* uuid */ + (taggedFormat ? 1 + getTagLength(uuidSet) : 0) + 8 /* number of intervals */ +
uuidSet.getIntervals().size() /* number of intervals */ * 16 /* start-end */;
}
buffer.writeInteger(dataSize, 4);
- buffer.writeLong(uuidSets.size(), 8);
+ buffer.writeLong(getEncodedUuidSetCount(uuidSets.size(), taggedFormat), 8);
for (GtidSet.UUIDSet uuidSet : uuidSets) {
buffer.write(hexToByteArray(uuidSet.getUUID().replace("-", "")));
+ if (taggedFormat) {
+ String tag = uuidSet.getTag();
+ buffer.writeInteger(getTagLength(uuidSet) << 1, 1);
+ if (tag != null) {
+ buffer.writeString(tag);
+ }
+ }
Collection intervals = uuidSet.getIntervals();
buffer.writeLong(intervals.size(), 8);
for (GtidSet.Interval interval : intervals) {
@@ -67,6 +75,27 @@ public byte[] toByteArray() throws IOException {
return buffer.toByteArray();
}
+ private static boolean containsTaggedGtid(Collection uuidSets) {
+ for (GtidSet.UUIDSet uuidSet : uuidSets) {
+ if (uuidSet.getTag() != null && !uuidSet.getTag().isEmpty()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static int getTagLength(GtidSet.UUIDSet uuidSet) {
+ String tag = uuidSet.getTag();
+ return tag == null ? 0 : tag.length();
+ }
+
+ private static long getEncodedUuidSetCount(int uuidSetCount, boolean taggedFormat) {
+ if (!taggedFormat) {
+ return uuidSetCount;
+ }
+ return (1L << 56) | ((long) uuidSetCount << 8) | 1L;
+ }
+
private static byte[] hexToByteArray(String uuid) {
byte[] b = new byte[uuid.length() / 2];
for (int i = 0, j = 0; j < uuid.length(); j += 2) {
diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientBinlogCompressIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientBinlogCompressIntegrationTest.java
index 46b44e08..0f40d364 100644
--- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientBinlogCompressIntegrationTest.java
+++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientBinlogCompressIntegrationTest.java
@@ -3,6 +3,8 @@
import org.testng.SkipException;
import org.testng.annotations.Test;
+import com.github.shyiko.mysql.binlog.network.SSLMode;
+
/**
* @author vjuranek
*/
diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDIntegrationTest.java
index ab06769a..cb4cf187 100644
--- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDIntegrationTest.java
+++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDIntegrationTest.java
@@ -48,13 +48,13 @@ protected TestDatabaseContainerOptions getOptions() {
public void testGTIDAdvancesStatementBased() throws Exception {
try {
master.execute("set global binlog_format=statement");
- slave.execute("stop slave", "set global binlog_format=statement", "start slave");
+ slave.execute("stop replica", "set global binlog_format=statement", "start replica");
master.reconnect();
master.execute("use test");
testGTIDAdvances();
} finally {
master.execute("set global binlog_format=row");
- slave.execute("stop slave", "set global binlog_format=row", "start slave");
+ slave.execute("stop replica", "set global binlog_format=row", "start replica");
master.reconnect();
master.execute("use test");
}
@@ -197,7 +197,12 @@ public void execute(Statement statement) throws SQLException {
private String getExecutedGtidSet(MySQLConnection master) throws SQLException {
final String[] initialGTIDSet = new String[1];
- master.query("show master status", new Callback() {
+ // MySQL 8.4+ renamed SHOW MASTER STATUS to SHOW BINARY LOG STATUS
+ String version = System.getProperty("mysql.image", "mysql:8.0");
+ boolean isMySQL84Plus = version.contains("8.4") || version.contains("8.5") || version.contains("9.");
+ String statusQuery = isMySQL84Plus ? "SHOW BINARY LOG STATUS" : "SHOW MASTER STATUS";
+
+ master.query(statusQuery, new Callback() {
@Override
public void execute(ResultSet rs) throws SQLException {
rs.next();
diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDTagIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDTagIntegrationTest.java
new file mode 100644
index 00000000..ddc8547a
--- /dev/null
+++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDTagIntegrationTest.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2024 Debezium Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.shyiko.mysql.binlog;
+
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventType;
+import com.github.shyiko.mysql.binlog.event.GtidEventData;
+import com.github.shyiko.mysql.binlog.event.GtidTaggedEventData;
+import com.github.shyiko.mysql.binlog.event.MySqlGtid;
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import org.testng.SkipException;
+import org.testng.annotations.Test;
+
+import java.sql.Statement;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Integration tests for MySQL 8.3+ GTID tag support using SET gtid_next.
+ *
+ * @author Debezium Authors
+ */
+public class BinaryLogClientGTIDTagIntegrationTest extends BinaryLogClientIntegrationTest {
+
+ private static final String TEST_TAG = "testtag";
+
+ @Override
+ protected TestDatabaseContainerOptions getOptions() {
+ if (!this.mysqlVersion.atLeast(8, 3)) {
+ throw new SkipException("GTID tags require MySQL 8.3+");
+ }
+
+ TestDatabaseContainerOptions options = new TestDatabaseContainerOptions();
+ options.gtid = true;
+ return options;
+ }
+
+ @Test
+ public void testGtidTagWithSetGtidNext() throws Exception {
+ master.execute("CREATE TABLE if not exists gtid_tag_test (id int primary key, value varchar(50))");
+
+ final String initialGTIDSet = getExecutedGtidSet();
+ assertNotNull(initialGTIDSet, "Initial GTID set is null");
+
+ final AtomicReference capturedTag = new AtomicReference<>();
+ final AtomicReference capturedGtid = new AtomicReference<>();
+
+ try {
+ client.disconnect();
+ final BinaryLogClient taggedClient = new BinaryLogClient(master.hostname(), master.port(),
+ master.username(), master.password());
+
+ taggedClient.setGtidSet(initialGTIDSet);
+ taggedClient.registerEventListener(new BinaryLogClient.EventListener() {
+ @Override
+ public void onEvent(Event event) {
+ final EventType eventType = event.getHeader().getEventType();
+ if (eventType == EventType.GTID) {
+ final GtidEventData gtidData = (GtidEventData) event.getData();
+ final MySqlGtid gtid = gtidData.getMySqlGtid();
+ capturedGtid.set(gtid);
+ capturedTag.set(gtid.getTag());
+ } else if (eventType == EventType.GTID_TAGGED) {
+ final GtidTaggedEventData gtidData = (GtidTaggedEventData) event.getData();
+ final MySqlGtid gtid = gtidData.getMySqlGtid();
+ capturedGtid.set(gtid);
+ capturedTag.set(gtid.getTag());
+ }
+ }
+ });
+ taggedClient.registerEventListener(new TraceEventListener());
+ taggedClient.registerEventListener(eventListener);
+
+ try {
+ eventListener.reset();
+ taggedClient.connect(DEFAULT_TIMEOUT);
+
+ // Execute a transaction with a tagged GTID using SET gtid_next.
+ master.execute(new Callback() {
+ @Override
+ public void execute(Statement statement) throws java.sql.SQLException {
+ statement.execute("SET SESSION gtid_next = 'AUTOMATIC:" + TEST_TAG + "'");
+ statement.execute("INSERT INTO gtid_tag_test (id, value) VALUES (1, 'tagged')");
+ }
+ });
+ master.execute("SET SESSION gtid_next = 'AUTOMATIC'");
+
+ // Wait for the tagged GTID event.
+ eventListener.waitForAtLeast(EventType.GTID_TAGGED, 1, TimeUnit.SECONDS.toMillis(4));
+
+ // Verify the tag was captured
+ assertNotNull(capturedTag.get(), "GTID tag was not captured");
+ assertEquals(capturedTag.get(), TEST_TAG, "GTID tag does not match expected value");
+
+ // Verify the full GTID includes the tag
+ assertNotNull(capturedGtid.get(), "GTID was not captured");
+ String gtidString = capturedGtid.get().toString();
+ assertNotNull(gtidString, "GTID toString returned null");
+ assertEquals(gtidString.split(":")[1], TEST_TAG, "GTID string does not include expected tag");
+
+ } finally {
+ taggedClient.disconnect();
+ }
+ } finally {
+ client.connect(DEFAULT_TIMEOUT);
+ master.execute("DROP TABLE IF EXISTS gtid_tag_test");
+ }
+ }
+
+ @Test
+ public void testMixedTaggedAndNonTaggedGtids() throws Exception {
+ master.execute("CREATE TABLE if not exists mixed_gtid_test (id int primary key, value varchar(50))");
+
+ final String initialGTIDSet = getExecutedGtidSet();
+ assertNotNull(initialGTIDSet, "Initial GTID set is null");
+
+ final AtomicReference firstTag = new AtomicReference<>();
+ final AtomicReference secondTag = new AtomicReference<>();
+ final CountDownLatch mixedRows = new CountDownLatch(2);
+
+ try {
+ client.disconnect();
+ final BinaryLogClient taggedClient = new BinaryLogClient(master.hostname(), master.port(),
+ master.username(), master.password());
+
+ taggedClient.setGtidSet(initialGTIDSet);
+
+ taggedClient.registerEventListener(new BinaryLogClient.EventListener() {
+ private final AtomicReference transactionTag = new AtomicReference<>();
+ private long mixedTableId = -1;
+
+ @Override
+ public void onEvent(Event event) {
+ final EventType eventType = event.getHeader().getEventType();
+ if (eventType == EventType.GTID) {
+ final GtidEventData gtidData = (GtidEventData) event.getData();
+ transactionTag.set(gtidData.getMySqlGtid().getTag());
+ } else if (eventType == EventType.GTID_TAGGED) {
+ final GtidTaggedEventData gtidData = (GtidTaggedEventData) event.getData();
+ transactionTag.set(gtidData.getMySqlGtid().getTag());
+ } else if (eventType == EventType.TABLE_MAP) {
+ final TableMapEventData tableMap = (TableMapEventData) event.getData();
+ if ("mixed_gtid_test".equals(tableMap.getTable())) {
+ mixedTableId = tableMap.getTableId();
+ }
+ } else if (EventType.isWrite(eventType)) {
+ final WriteRowsEventData writeRows = (WriteRowsEventData) event.getData();
+ if (writeRows.getTableId() == mixedTableId) {
+ for (Object[] row : writeRows.getRows()) {
+ if (Integer.valueOf(1).equals(row[0])) {
+ firstTag.set(transactionTag.get());
+ mixedRows.countDown();
+ } else if (Integer.valueOf(2).equals(row[0])) {
+ secondTag.set(transactionTag.get());
+ mixedRows.countDown();
+ }
+ }
+ }
+ }
+ }
+ });
+ taggedClient.registerEventListener(eventListener);
+
+ try {
+ eventListener.reset();
+ taggedClient.connect(DEFAULT_TIMEOUT);
+
+ // Execute one tagged transaction and one untagged transaction.
+ master.execute(new Callback() {
+ @Override
+ public void execute(Statement statement) throws java.sql.SQLException {
+ statement.execute("SET SESSION gtid_next = 'AUTOMATIC:" + TEST_TAG + "'");
+ statement.execute("INSERT INTO mixed_gtid_test (id, value) VALUES (1, 'tagged')");
+ }
+ });
+ master.execute("SET SESSION gtid_next = 'AUTOMATIC'");
+ master.execute("INSERT INTO mixed_gtid_test (id, value) VALUES (2, 'not-tagged')");
+
+ // Wait for both GTID event types and the rows that belong to this test.
+ eventListener.waitForAtLeast(EventType.GTID_TAGGED, 1, TimeUnit.SECONDS.toMillis(4));
+ eventListener.waitForAtLeast(EventType.GTID, 1, TimeUnit.SECONDS.toMillis(4));
+ assertTrue(mixedRows.await(4, TimeUnit.SECONDS), "Timed out waiting for mixed GTID test rows");
+
+ // Verify the tagged insert was associated with the tag.
+ assertNotNull(firstTag.get(), "First GTID tag was not captured");
+ assertEquals(firstTag.get(), TEST_TAG, "First GTID tag does not match");
+
+ // Verify the untagged insert was not associated with a tag.
+ assertNull(secondTag.get(), "Second GTID should not have a tag");
+
+ } finally {
+ taggedClient.disconnect();
+ }
+ } finally {
+ client.connect(DEFAULT_TIMEOUT);
+ master.execute("DROP TABLE IF EXISTS mixed_gtid_test");
+ }
+ }
+
+ private String getExecutedGtidSet() throws Exception {
+ final String[] gtidSet = new String[1];
+ // MySQL 8.4+ renamed SHOW MASTER STATUS to SHOW BINARY LOG STATUS
+ String version = System.getProperty("mysql.image", "mysql:8.0");
+ boolean isMySQL84Plus = version.contains("8.4") || version.contains("8.5") || version.contains("9.");
+ String statusQuery = isMySQL84Plus ? "SHOW BINARY LOG STATUS" : "SHOW MASTER STATUS";
+
+ master.query(statusQuery, new Callback() {
+ @Override
+ public void execute(java.sql.ResultSet rs) throws java.sql.SQLException {
+ if (rs.next()) {
+ gtidSet[0] = rs.getString("Executed_Gtid_Set");
+ }
+ }
+ });
+ return gtidSet[0];
+ }
+}
diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java
index 80203986..f6e1dd6c 100644
--- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java
+++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java
@@ -1013,7 +1013,7 @@ public void testMysql8FastAuth() throws Exception {
@Test
public void testSHA2CachingAuthAsDefault() throws Exception {
- if ( !mysqlVersion.atLeast(8, 0) )
+ if ( !mysqlVersion.atLeast(8, 0) || !mysqlVersion.lessThan(8, 4) )
throw new SkipException("skipping mysql8 auth test");
TestDatabaseContainerOptions opts = new TestDatabaseContainerOptions();
diff --git a/src/test/java/com/github/shyiko/mysql/binlog/GtidSetTest.java b/src/test/java/com/github/shyiko/mysql/binlog/GtidSetTest.java
index d6e212f9..0d03b3b1 100644
--- a/src/test/java/com/github/shyiko/mysql/binlog/GtidSetTest.java
+++ b/src/test/java/com/github/shyiko/mysql/binlog/GtidSetTest.java
@@ -20,9 +20,11 @@
import com.github.shyiko.mysql.binlog.event.MySqlGtid;
import org.testng.annotations.Test;
+import java.util.Collection;
import java.util.LinkedList;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
@@ -184,4 +186,148 @@ public void testAddAnotherObjectAsGtidFails() {
GtidSet gtidSet = new GtidSet("");
gtidSet.addGtid(MariadbGtidSet.MariaGtid.parse("1-2-3"));
}
+
+ @Test
+ public void testParseTaggedGtidSet() {
+ // Test parsing GTID set with tagged GTIDs (MySQL 8.3+)
+ final GtidSet gtidSet = new GtidSet("24bc7850-2c16-11e6-a073-0242ac110002:mytag:1-5");
+
+ final Collection uuidSets = gtidSet.getUUIDSets();
+ assertEquals(uuidSets.size(), 1);
+
+ final GtidSet.UUIDSet uuidSet = uuidSets.iterator().next();
+ assertEquals(uuidSet.getServerId().toString(), "24bc7850-2c16-11e6-a073-0242ac110002");
+ assertEquals(uuidSet.getTag(), "mytag");
+ assertEquals(uuidSet.getIntervals().size(), 1);
+ assertEquals(uuidSet.getIntervals().get(0).getStart(), 1L);
+ assertEquals(uuidSet.getIntervals().get(0).getEnd(), 5L);
+ assertEquals(gtidSet.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:mytag:1-5");
+ }
+
+ @Test
+ public void testParseTaggedGtidSetWithMultipleIntervals() {
+ // Test parsing tagged GTID set with multiple intervals
+ final GtidSet gtidSet = new GtidSet("24bc7850-2c16-11e6-a073-0242ac110002:prod:1-5:10-15:20");
+
+ final Collection uuidSets = gtidSet.getUUIDSets();
+ assertEquals(uuidSets.size(), 1);
+
+ final GtidSet.UUIDSet uuidSet = uuidSets.iterator().next();
+ assertEquals(uuidSet.getServerId().toString(), "24bc7850-2c16-11e6-a073-0242ac110002");
+ assertEquals(uuidSet.getTag(), "prod");
+ assertEquals(uuidSet.getIntervals().size(), 3);
+ assertEquals(uuidSet.getIntervals().get(0).getStart(), 1L);
+ assertEquals(uuidSet.getIntervals().get(0).getEnd(), 5L);
+ assertEquals(uuidSet.getIntervals().get(1).getStart(), 10L);
+ assertEquals(uuidSet.getIntervals().get(1).getEnd(), 15L);
+ assertEquals(uuidSet.getIntervals().get(2).getStart(), 20L);
+ assertEquals(uuidSet.getIntervals().get(2).getEnd(), 20L);
+ }
+
+ @Test
+ public void testParseMixedGtidSet() {
+ // Test parsing GTID set with both tagged and non-tagged GTIDs
+ final GtidSet gtidSet = new GtidSet(
+ "24bc7850-2c16-11e6-a073-0242ac110002:1-5," +
+ "aae57b2f-8e44-11ee-a3d6-a036bcda1a41:mytag:1-10"
+ );
+
+ final Collection uuidSets = gtidSet.getUUIDSets();
+ assertEquals(uuidSets.size(), 2);
+
+ // Verify both UUIDs are present
+ assertNotNull(gtidSet.getUUIDSet("24bc7850-2c16-11e6-a073-0242ac110002"));
+ assertNotNull(gtidSet.getUUIDSet("aae57b2f-8e44-11ee-a3d6-a036bcda1a41", "mytag"));
+ }
+
+ @Test
+ public void testParseExecutedGtidSetWithTaggedIntervals() {
+ final GtidSet gtidSet = new GtidSet(
+ "24bc7850-2c16-11e6-a073-0242ac110002:1-20:testtag:1-3"
+ );
+
+ assertEquals(gtidSet.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:1-20:testtag:1-3");
+ }
+
+ @Test
+ public void testParseExecutedGtidSetWithSeparatedTaggedIntervals() {
+ final GtidSet gtidSet = new GtidSet(
+ "24bc7850-2c16-11e6-a073-0242ac110002:1-5:testtag:7-9:othertag:6"
+ );
+
+ assertEquals(gtidSet.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:1-5:testtag:7-9:othertag:6-6");
+ }
+
+ @Test
+ public void testParseMixedGtidSetMultipleServers() {
+ // Test parsing complex GTID set with multiple servers, some tagged
+ final GtidSet gtidSet = new GtidSet(
+ "24bc7850-2c16-11e6-a073-0242ac110002:1-5:10," +
+ "aae57b2f-8e44-11ee-a3d6-a036bcda1a41:tag1:1-10:15-20," +
+ "994ab859-8ea8-11ee-a568-a036bcda1a41:1-3," +
+ "bd9794e0-1d65-11ed-a7e7-0adb305b3a12:tag2:5-9"
+ );
+
+ final Collection uuidSets = gtidSet.getUUIDSets();
+ assertEquals(uuidSets.size(), 4);
+ }
+
+ @Test
+ public void testAddTaggedMySqlGtid() {
+ // Test adding tagged GTID to set
+ final GtidSet gtidSet = new GtidSet("");
+ gtidSet.addGtid(MySqlGtid.fromString("00000000-0000-0000-0000-000000000000:mytag:2"));
+
+ assertEquals(gtidSet.toString(), "00000000-0000-0000-0000-000000000000:mytag:2-2");
+ }
+
+ @Test
+ public void testAddTaggedMySqlGtidToExistingSet() {
+ // Test adding tagged GTID to existing set
+ final GtidSet gtidSet = new GtidSet("00000000-0000-0000-0000-000000000000:1");
+ gtidSet.addGtid(MySqlGtid.fromString("00000000-0000-0000-0000-000000000000:mytag:2"));
+
+ assertEquals(gtidSet.toString(), "00000000-0000-0000-0000-000000000000:1-1:mytag:2-2");
+ }
+
+ @Test
+ public void testParseTaggedGtidSetWithComplexTag() {
+ // Test parsing GTID set with complex tag name
+ final GtidSet gtidSet = new GtidSet("24bc7850-2c16-11e6-a073-0242ac110002:prod_db_01:1-100");
+
+ final Collection uuidSets = gtidSet.getUUIDSets();
+ assertEquals(uuidSets.size(), 1);
+
+ final GtidSet.UUIDSet uuidSet = uuidSets.iterator().next();
+ assertEquals(uuidSet.getServerId().toString(), "24bc7850-2c16-11e6-a073-0242ac110002");
+ assertEquals(uuidSet.getTag(), "prod_db_01");
+ assertEquals(uuidSet.getIntervals().size(), 1);
+ assertEquals(uuidSet.getIntervals().get(0).getStart(), 1L);
+ assertEquals(uuidSet.getIntervals().get(0).getEnd(), 100L);
+ }
+
+ @Test
+ public void testParseTaggedAndUntaggedGtidSetsAreDistinct() {
+ final GtidSet legacySet = new GtidSet("24bc7850-2c16-11e6-a073-0242ac110002:1-5");
+ final GtidSet taggedSet = new GtidSet("24bc7850-2c16-11e6-a073-0242ac110002:tag:1-5");
+
+ // Tagged and untagged GTIDs from the same UUID are different TSIDs.
+ final GtidSet.UUIDSet legacyUuidSet = legacySet.getUUIDSet("24bc7850-2c16-11e6-a073-0242ac110002");
+
+ assertNotNull(legacyUuidSet);
+ assertEquals(legacyUuidSet.getTag(), null);
+ assertEquals(taggedSet.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:tag:1-5");
+ assertNotEquals(legacySet, taggedSet);
+ }
+
+ @Test
+ public void testParsePreviousTagFirstGtidSetFormat() {
+ final GtidSet gtidSet = new GtidSet("tag:24bc7850-2c16-11e6-a073-0242ac110002:1-5");
+
+ final GtidSet.UUIDSet uuidSet = gtidSet.getUUIDSet("24bc7850-2c16-11e6-a073-0242ac110002", "tag");
+ assertNotNull(uuidSet);
+ assertEquals(uuidSet.getIntervals().get(0), new Interval(1, 5));
+ assertEquals(gtidSet.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:tag:1-5");
+ }
+
}
diff --git a/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java b/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java
index 8d84c2bd..56676035 100644
--- a/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java
+++ b/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java
@@ -145,7 +145,13 @@ public void boot() throws Exception {
public void setupSlave(int masterPort) throws SQLException {
Connection master = DriverManager.getConnection("jdbc:mysql://127.0.0.1:" + masterPort + "/mysql?useSSL=false", "root", "");
- ResultSet rs = master.createStatement().executeQuery("show master status");
+
+ // MySQL 8.4+ renamed SHOW MASTER STATUS to SHOW BINARY LOG STATUS and replication commands
+ String version = System.getProperty("mysql.image", "mysql:8.0");
+ boolean isMySQL84Plus = version.contains("8.4") || version.contains("8.5") || version.contains("9.");
+ String statusQuery = isMySQL84Plus ? "SHOW BINARY LOG STATUS" : "show master status";
+
+ ResultSet rs = master.createStatement().executeQuery(statusQuery);
if ( !rs.next() )
throw new RuntimeException("could not get master status");
@@ -153,25 +159,39 @@ public void setupSlave(int masterPort) throws SQLException {
Long position = rs.getLong("Position");
rs.close();
- String changeSQL = String.format(
- "CHANGE MASTER to master_host = '127.0.0.1', master_user='maxwell', master_password='maxwell', "
- + "master_log_file = '%s', master_log_pos = %d, master_port = %d",
- file, position, masterPort
- );
+ String changeSQL;
+ if (isMySQL84Plus) {
+ changeSQL = String.format(
+ "CHANGE REPLICATION SOURCE to SOURCE_HOST = '127.0.0.1', SOURCE_USER='maxwell', SOURCE_PASSWORD='maxwell', "
+ + "SOURCE_LOG_FILE = '%s', SOURCE_LOG_POS = %d, SOURCE_PORT = %d",
+ file, position, masterPort
+ );
+ } else {
+ changeSQL = String.format(
+ "CHANGE MASTER to master_host = '127.0.0.1', master_user='maxwell', master_password='maxwell', "
+ + "master_log_file = '%s', master_log_pos = %d, master_port = %d",
+ file, position, masterPort
+ );
+ }
logger.info("starting up slave: " + changeSQL);
getConnection().createStatement().execute(changeSQL);
- getConnection().createStatement().execute("START SLAVE");
+ String startCommand = isMySQL84Plus ? "START REPLICA" : "START SLAVE";
+ getConnection().createStatement().execute(startCommand);
rs.close();
- ResultSet status = query("show slave status");
+ String showStatusCommand = isMySQL84Plus ? "SHOW REPLICA STATUS" : "show slave status";
+ ResultSet status = query(showStatusCommand);
if ( !status.next() )
throw new RuntimeException("could not get slave status");
- if ( status.getString("Slave_IO_Running").equals("No")
- || status.getString("Slave_SQL_Running").equals("No")) {
- throw new RuntimeException("could not start slave: " + dumpQuery("show slave status"));
+ String ioRunningColumn = isMySQL84Plus ? "Replica_IO_Running" : "Slave_IO_Running";
+ String sqlRunningColumn = isMySQL84Plus ? "Replica_SQL_Running" : "Slave_SQL_Running";
+
+ if ( status.getString(ioRunningColumn).equals("No")
+ || status.getString(sqlRunningColumn).equals("No")) {
+ throw new RuntimeException("could not start slave: " + dumpQuery(showStatusCommand));
}
status.close();
@@ -271,17 +291,25 @@ private static String getVersionString() {
}
public void waitForSlaveToBeCurrent(MysqlOnetimeServer master) throws Exception {
- ResultSet ms = master.query("show master status");
+ // MySQL 8.4+ renamed SHOW MASTER STATUS to SHOW BINARY LOG STATUS
+ String version = System.getProperty("mysql.image", "mysql:8.0");
+ boolean isMySQL84Plus = version.contains("8.4") || version.contains("8.5") || version.contains("9.");
+ String masterStatusQuery = isMySQL84Plus ? "SHOW BINARY LOG STATUS" : "show master status";
+ String slaveStatusQuery = isMySQL84Plus ? "SHOW REPLICA STATUS" : "show slave status";
+ String relayMasterLogFileColumn = isMySQL84Plus ? "Relay_Source_Log_File" : "Relay_Master_Log_File";
+ String execMasterLogPosColumn = isMySQL84Plus ? "Exec_Source_Log_Pos" : "Exec_Master_Log_Pos";
+
+ ResultSet ms = master.query(masterStatusQuery);
ms.next();
String masterFile = ms.getString("File");
Long masterPos = ms.getLong("Position");
ms.close();
while ( true ) {
- ResultSet rs = query("show slave status");
+ ResultSet rs = query(slaveStatusQuery);
rs.next();
- if ( rs.getString("Relay_Master_Log_File").equals(masterFile) &&
- rs.getLong("Exec_Master_Log_Pos") >= masterPos )
+ if ( rs.getString(relayMasterLogFileColumn).equals(masterFile) &&
+ rs.getLong(execMasterLogPosColumn) >= masterPos )
return;
Thread.sleep(200);
diff --git a/src/test/java/com/github/shyiko/mysql/binlog/TestDatabaseContainer.java b/src/test/java/com/github/shyiko/mysql/binlog/TestDatabaseContainer.java
index 15644d0a..4cf7602a 100644
--- a/src/test/java/com/github/shyiko/mysql/binlog/TestDatabaseContainer.java
+++ b/src/test/java/com/github/shyiko/mysql/binlog/TestDatabaseContainer.java
@@ -45,8 +45,14 @@ public TestDatabaseContainer() {
* Creates a new test database container with specific options.
*/
public TestDatabaseContainer(TestDatabaseContainerOptions options) {
- // Use version from options first, then fall back to system property
- String version = options.version != null ? options.version : getImageFromSystemProperty();
+ // Use version from options first, then fall back to system property, then default
+ String version = options.version;
+ if (version == null || version.isEmpty()) {
+ version = getImageFromSystemProperty();
+ }
+ if (version == null || version.isEmpty()) {
+ version = "mysql:8.0"; // Default version
+ }
logger.info("Using database image: " + version);
@@ -54,18 +60,66 @@ public TestDatabaseContainer(TestDatabaseContainerOptions options) {
if (version.toLowerCase().contains("mariadb")) {
this.databaseType = DatabaseType.MARIADB;
DockerImageName imageName = DockerImageName.parse(version).asCompatibleSubstituteFor("mariadb");
+
+ List commands = new java.util.ArrayList<>();
+ commands.add("--log-bin=master");
+ commands.add("--binlog_format=row");
+ commands.add("--server-id=" + options.serverID);
+ commands.add("--default-time-zone=+00:00");
+ commands.add("--max_allowed_packet=16M");
+
+ if (options.extraParams != null && !options.extraParams.isEmpty()) {
+ String[] extraParamArray = options.extraParams.trim().split("\\s+");
+ commands.addAll(Arrays.asList(extraParamArray));
+ }
+
this.container = new MariaDBContainer<>(imageName)
.withDatabaseName("mysql")
.withUsername("root")
- .withPassword("");
+ .withPassword("")
+ .withCommand(commands.toArray(new String[0]));
} else {
this.databaseType = DatabaseType.MYSQL;
DockerImageName imageName = DockerImageName.parse(version).asCompatibleSubstituteFor("mysql");
+
+ // Build command parameters
+ List commands = new java.util.ArrayList<>();
+
+ // Configure container based on options
+ if (options.gtid) {
+ logger.info("Enabling GTID mode");
+ commands.add("--gtid-mode=ON");
+ commands.add("--log-slave-updates=ON");
+ commands.add("--enforce-gtid-consistency=true");
+ }
+
+ commands.add("--log-bin=master");
+ commands.add("--binlog_format=row");
+ commands.add("--server-id=" + options.serverID);
+ commands.add("--default-time-zone=+00:00");
+
+ // Add full row metadata for MySQL 8.0+
+ if (options.fullRowMetaData && !version.toLowerCase().startsWith("mariadb")) {
+ MysqlVersion mysqlVersion = parseVersion(version);
+ if (mysqlVersion.atLeast(8, 0)) {
+ commands.add("--binlog-row-metadata=FULL");
+ }
+ }
+
+ // Allow larger data packets for testing
+ commands.add("--max_allowed_packet=16M");
+
+ // Add extra parameters if provided
+ if (options.extraParams != null && !options.extraParams.isEmpty()) {
+ String[] extraParamArray = options.extraParams.trim().split("\\s+");
+ commands.addAll(Arrays.asList(extraParamArray));
+ }
+
this.container = new MySQLContainer<>(imageName)
.withDatabaseName("mysql")
.withUsername("root")
.withPassword("")
- .withCommand("--default-time-zone=+00:00");
+ .withCommand(commands.toArray(new String[0]));
}
// Configure network if provided (for master-slave setups)
@@ -75,47 +129,14 @@ public TestDatabaseContainer(TestDatabaseContainerOptions options) {
container.withNetworkAliases(options.networkAlias);
}
}
-
- // Build all command parameters together
- List commands = new java.util.ArrayList<>();
-
- // Configure container based on options
- if (options.gtid) {
- logger.info("Enabling GTID mode");
- commands.add("--gtid-mode=ON");
- commands.add("--log-slave-updates=ON");
- commands.add("--enforce-gtid-consistency=true");
- }
-
- commands.add("--log-bin=master");
- commands.add("--binlog_format=row");
- commands.add("--server-id=" + options.serverID);
- commands.add("--default-time-zone=+00:00");
-
- // Add full row metadata for MySQL 8.0+
- if (options.fullRowMetaData && !version.toLowerCase().startsWith("mariadb")) {
- MysqlVersion mysqlVersion = parseVersion(version);
- if (mysqlVersion.atLeast(8, 0)) {
- commands.add("--binlog-row-metadata=FULL");
- }
- }
-
- // Allow larger data packets for testing
- commands.add("--max_allowed_packet=16M");
-
- // Add extra parameters if provided
- if (options.extraParams != null && !options.extraParams.isEmpty()) {
- String[] extraParamArray = options.extraParams.trim().split("\\s+");
- commands.addAll(Arrays.asList(extraParamArray));
- }
-
- // Apply all commands at once
- container.withCommand(commands.toArray(new String[0]));
}
private static String getImageFromSystemProperty() {
String mysqlImage = System.getProperty("mysql.image");
- return mysqlImage == null ? "mysql:8.0" : mysqlImage;
+ String result = mysqlImage == null ? "mysql:8.0" : mysqlImage;
+ Logger.getLogger(TestDatabaseContainer.class.getName()).info(
+ "mysql.image system property: " + mysqlImage + " -> using: " + result);
+ return result;
}
private static MysqlVersion parseVersion(String imageTag) {
@@ -156,8 +177,16 @@ public void start() throws Exception {
}
if (databaseType == DatabaseType.MYSQL) {
- this.connection.createStatement().executeUpdate(
- "CREATE USER 'maxwell'@'%' IDENTIFIED WITH mysql_native_password BY 'maxwell'");
+ // MySQL 8.4+ removed mysql_native_password plugin, use default authentication
+ String version = getImageFromSystemProperty();
+ MysqlVersion mysqlVersion = parseVersion(version);
+ if (mysqlVersion.atLeast(8, 4)) {
+ this.connection.createStatement().executeUpdate(
+ "CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell'");
+ } else {
+ this.connection.createStatement().executeUpdate(
+ "CREATE USER 'maxwell'@'%' IDENTIFIED WITH mysql_native_password BY 'maxwell'");
+ }
} else {
this.connection.createStatement().executeUpdate(
"CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell'");
@@ -307,7 +336,13 @@ public String dumpQuery(String query) throws SQLException {
*/
public void setupSlave(TestDatabaseContainer master) throws SQLException {
Connection masterConn = master.getConnection();
- ResultSet rs = masterConn.createStatement().executeQuery("SHOW MASTER STATUS");
+
+ // MySQL 8.4+ renamed SHOW MASTER STATUS to SHOW BINARY LOG STATUS
+ String version = getImageFromSystemProperty();
+ MysqlVersion mysqlVersion = parseVersion(version);
+ String statusQuery = mysqlVersion.atLeast(8, 4) ? "SHOW BINARY LOG STATUS" : "SHOW MASTER STATUS";
+
+ ResultSet rs = masterConn.createStatement().executeQuery(statusQuery);
if (!rs.next()) {
throw new RuntimeException("Could not get master status");
}
@@ -323,25 +358,75 @@ public void setupSlave(TestDatabaseContainer master) throws SQLException {
logger.info("Master container IP: " + masterHost);
- String changeSQL = String.format(
- "CHANGE MASTER TO master_host = '%s', master_user='maxwell', master_password='maxwell', " +
- "master_log_file = '%s', master_log_pos = %d, master_port = %d",
- masterHost, file, position, 3306
- );
+ // MySQL 8.4+ renamed replication commands
+ String versionStr = getImageFromSystemProperty();
+ MysqlVersion mysqlVer = parseVersion(versionStr);
+ boolean isMySQL84Plus = mysqlVer.atLeast(8, 4);
+
+ String changeSQL;
+ String startCommand;
+ String showStatusCommand;
+ String ioRunningColumn;
+ String sqlRunningColumn;
+
+ if (isMySQL84Plus) {
+ changeSQL = String.format(
+ "CHANGE REPLICATION SOURCE TO SOURCE_HOST = '%s', SOURCE_USER='maxwell', SOURCE_PASSWORD='maxwell', " +
+ "SOURCE_LOG_FILE = '%s', SOURCE_LOG_POS = %d, SOURCE_PORT = %d, GET_SOURCE_PUBLIC_KEY = 1",
+ masterHost, file, position, 3306
+ );
+ startCommand = "START REPLICA";
+ showStatusCommand = "SHOW REPLICA STATUS";
+ ioRunningColumn = "Replica_IO_Running";
+ sqlRunningColumn = "Replica_SQL_Running";
+ } else {
+ if (databaseType == DatabaseType.MARIADB) {
+ changeSQL = String.format(
+ "CHANGE MASTER TO master_host = '%s', master_user='maxwell', master_password='maxwell', " +
+ "master_log_file = '%s', master_log_pos = %d, master_port = %d",
+ masterHost, file, position, 3306
+ );
+ } else {
+ changeSQL = String.format(
+ "CHANGE MASTER TO master_host = '%s', master_user='maxwell', master_password='maxwell', " +
+ "master_log_file = '%s', master_log_pos = %d, master_port = %d, GET_MASTER_PUBLIC_KEY = 1",
+ masterHost, file, position, 3306
+ );
+ }
+ startCommand = "START SLAVE";
+ showStatusCommand = "SHOW SLAVE STATUS";
+ ioRunningColumn = "Slave_IO_Running";
+ sqlRunningColumn = "Slave_SQL_Running";
+ }
logger.info("Starting up slave: " + changeSQL);
getConnection().createStatement().execute(changeSQL);
- getConnection().createStatement().execute("START SLAVE");
+ getConnection().createStatement().execute(startCommand);
- ResultSet status = query("SHOW SLAVE STATUS");
+ // Wait a moment for replication to start before checking status
+ // The IO thread needs time to establish connection to the master
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while waiting for replication to start", e);
+ }
+
+ ResultSet status = query(showStatusCommand);
if (!status.next()) {
throw new RuntimeException("Could not get slave status");
}
- if (status.getString("Slave_IO_Running").equals("No") ||
- status.getString("Slave_SQL_Running").equals("No")) {
- throw new RuntimeException("Could not start slave: " + dumpQuery("SHOW SLAVE STATUS"));
+ String ioRunning = status.getString(ioRunningColumn);
+ String sqlRunning = status.getString(sqlRunningColumn);
+
+ // IO thread may still be "Connecting" initially, which is acceptable
+ // Only fail if it's explicitly "No" (error state)
+ if (ioRunning.equals("No") || sqlRunning.equals("No")) {
+ throw new RuntimeException("Could not start slave: " + dumpQuery(showStatusCommand));
}
+
+ logger.info("Slave replication started: IO=" + ioRunning + ", SQL=" + sqlRunning);
status.close();
}
@@ -349,7 +434,12 @@ public void setupSlave(TestDatabaseContainer master) throws SQLException {
* Waits for the slave to catch up with the master using Awaitility.
*/
public void waitForSlaveToBeCurrent(TestDatabaseContainer master) throws Exception {
- ResultSet ms = master.query("SHOW MASTER STATUS");
+ // MySQL 8.4+ renamed SHOW MASTER STATUS to SHOW BINARY LOG STATUS
+ String version = getImageFromSystemProperty();
+ MysqlVersion mysqlVersion = parseVersion(version);
+ String statusQuery = mysqlVersion.atLeast(8, 4) ? "SHOW BINARY LOG STATUS" : "SHOW MASTER STATUS";
+
+ ResultSet ms = master.query(statusQuery);
ms.next();
final String masterFile = ms.getString("File");
final Long masterPos = ms.getLong("Position");
@@ -357,28 +447,36 @@ public void waitForSlaveToBeCurrent(TestDatabaseContainer master) throws Excepti
logger.info("Waiting for slave to catch up: master file=" + masterFile + ", position=" + masterPos);
+ // MySQL 8.4+ renamed SHOW SLAVE STATUS to SHOW REPLICA STATUS
+ final boolean isMySQL84Plus = mysqlVersion.atLeast(8, 4);
+ final String showStatusCommand = isMySQL84Plus ? "SHOW REPLICA STATUS" : "SHOW SLAVE STATUS";
+ final String ioRunningColumn = isMySQL84Plus ? "Replica_IO_Running" : "Slave_IO_Running";
+ final String sqlRunningColumn = isMySQL84Plus ? "Replica_SQL_Running" : "Slave_SQL_Running";
+ final String relayMasterLogFileColumn = isMySQL84Plus ? "Relay_Source_Log_File" : "Relay_Master_Log_File";
+ final String execMasterLogPosColumn = isMySQL84Plus ? "Exec_Source_Log_Pos" : "Exec_Master_Log_Pos";
+
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofMillis(200))
.pollDelay(Duration.ZERO)
.ignoreExceptions()
.until(() -> {
- try (ResultSet rs = query("SHOW SLAVE STATUS")) {
+ try (ResultSet rs = query(showStatusCommand)) {
if (!rs.next()) {
logger.warning("Slave status not available");
return false;
}
- String slaveIORunning = rs.getString("Slave_IO_Running");
- String slaveSQLRunning = rs.getString("Slave_SQL_Running");
- String relayMasterLogFile = rs.getString("Relay_Master_Log_File");
- Long execMasterLogPos = rs.getLong("Exec_Master_Log_Pos");
+ String slaveIORunning = rs.getString(ioRunningColumn);
+ String slaveSQLRunning = rs.getString(sqlRunningColumn);
+ String relayMasterLogFile = rs.getString(relayMasterLogFileColumn);
+ Long execMasterLogPos = rs.getLong(execMasterLogPosColumn);
logger.info("Slave status: IO=" + slaveIORunning + ", SQL=" + slaveSQLRunning +
", file=" + relayMasterLogFile + ", pos=" + execMasterLogPos);
if (!"Yes".equals(slaveIORunning) || !"Yes".equals(slaveSQLRunning)) {
- throw new RuntimeException("Slave replication not running: " + dumpQuery("SHOW SLAVE STATUS"));
+ throw new RuntimeException("Slave replication not running: " + dumpQuery(showStatusCommand));
}
boolean caughtUp = relayMasterLogFile.equals(masterFile) && execMasterLogPos >= masterPos;
diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/EventTypeTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/EventTypeTest.java
index df1e1718..58ea74d4 100644
--- a/src/test/java/com/github/shyiko/mysql/binlog/event/EventTypeTest.java
+++ b/src/test/java/com/github/shyiko/mysql/binlog/event/EventTypeTest.java
@@ -74,7 +74,7 @@ public void testIsDelete() throws Exception {
public void testByEventNumber() {
assertEquals(EventType.byEventNumber(0), EventType.UNKNOWN);
assertNull(EventType.byEventNumber(-1));
- assertNull(EventType.byEventNumber(41));
+ assertNull(EventType.byEventNumber(43));
assertNull(EventType.byEventNumber(159));
assertNull(EventType.byEventNumber(164));
assertEquals(EventType.byEventNumber(1), EventType.START_V3);
diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/MySqlGtidTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/MySqlGtidTest.java
new file mode 100644
index 00000000..ffce2f77
--- /dev/null
+++ b/src/test/java/com/github/shyiko/mysql/binlog/event/MySqlGtidTest.java
@@ -0,0 +1,190 @@
+package com.github.shyiko.mysql.binlog.event;
+
+import org.testng.annotations.Test;
+
+import java.util.UUID;
+
+import static org.testng.Assert.*;
+
+/**
+ * Unit tests for MySqlGtid class, including MySQL 8.3+ tagged GTID support.
+ */
+public class MySqlGtidTest {
+
+ @Test
+ public void testParseLegacyFormat() {
+ // Test backward compatibility with non-tagged GTIDs
+ final MySqlGtid gtid = MySqlGtid.fromString("24bc7850-2c16-11e6-a073-0242ac110002:11");
+
+ assertNull(gtid.getTag());
+ assertEquals(gtid.getServerId().toString(), "24bc7850-2c16-11e6-a073-0242ac110002");
+ assertEquals(gtid.getTransactionId(), 11L);
+ assertEquals(gtid.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:11");
+ }
+
+ @Test
+ public void testParseTaggedFormat() {
+ // Test MySQL 8.3+ tagged GTID format
+ final MySqlGtid gtid = MySqlGtid.fromString("24bc7850-2c16-11e6-a073-0242ac110002:mytag:11");
+
+ assertEquals(gtid.getTag(), "mytag");
+ assertEquals(gtid.getServerId().toString(), "24bc7850-2c16-11e6-a073-0242ac110002");
+ assertEquals(gtid.getTransactionId(), 11L);
+ assertEquals(gtid.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:mytag:11");
+ }
+
+ @Test
+ public void testParseTaggedFormatWithComplexTag() {
+ // Test with more complex tag names
+ final MySqlGtid gtid = MySqlGtid.fromString("aae57b2f-8e44-11ee-a3d6-a036bcda1a41:prod_db_01:42");
+
+ assertEquals(gtid.getTag(), "prod_db_01");
+ assertEquals(gtid.getServerId().toString(), "aae57b2f-8e44-11ee-a3d6-a036bcda1a41");
+ assertEquals(gtid.getTransactionId(), 42L);
+ assertEquals(gtid.toString(), "aae57b2f-8e44-11ee-a3d6-a036bcda1a41:prod_db_01:42");
+ }
+
+ @Test
+ public void testParseTagFirstFormatForBackwardCompatibility() {
+ // Accept the previous tag-first representation for backward compatibility.
+ final MySqlGtid gtid = MySqlGtid.fromString("12345:994ab859-8ea8-11ee-a568-a036bcda1a41:3");
+
+ assertEquals(gtid.getTag(), "12345");
+ assertEquals(gtid.getServerId().toString(), "994ab859-8ea8-11ee-a568-a036bcda1a41");
+ assertEquals(gtid.getTransactionId(), 3L);
+ assertEquals(gtid.toString(), "994ab859-8ea8-11ee-a568-a036bcda1a41:12345:3");
+ }
+
+ @Test
+ public void testParseTaggedFormatWithUnderscoreTag() {
+ // Test with underscore in tag
+ final MySqlGtid gtid = MySqlGtid.fromString("bd9794e0-1d65-11ed-a7e7-0adb305b3a12:my_tag_123:9");
+
+ assertEquals(gtid.getTag(), "my_tag_123");
+ assertEquals(gtid.getServerId().toString(), "bd9794e0-1d65-11ed-a7e7-0adb305b3a12");
+ assertEquals(gtid.getTransactionId(), 9L);
+ assertEquals(gtid.toString(), "bd9794e0-1d65-11ed-a7e7-0adb305b3a12:my_tag_123:9");
+ }
+
+ @Test
+ public void testConstructorWithoutTag() {
+ // Test backward compatible constructor
+ final UUID uuid = UUID.fromString("24bc7850-2c16-11e6-a073-0242ac110002");
+ final MySqlGtid gtid = new MySqlGtid(uuid, 11L);
+
+ assertNull(gtid.getTag());
+ assertEquals(gtid.getServerId(), uuid);
+ assertEquals(gtid.getTransactionId(), 11L);
+ assertEquals(gtid.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:11");
+ }
+
+ @Test
+ public void testConstructorWithTag() {
+ // Test new constructor with tag
+ final UUID uuid = UUID.fromString("24bc7850-2c16-11e6-a073-0242ac110002");
+ final MySqlGtid gtid = new MySqlGtid("mytag", uuid, 11L);
+
+ assertEquals(gtid.getTag(), "mytag");
+ assertEquals(gtid.getServerId(), uuid);
+ assertEquals(gtid.getTransactionId(), 11L);
+ assertEquals(gtid.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:mytag:11");
+ }
+
+ @Test
+ public void testConstructorWithEmptyTag() {
+ // Test that empty tag is treated as no tag in toString
+ final UUID uuid = UUID.fromString("24bc7850-2c16-11e6-a073-0242ac110002");
+ final MySqlGtid gtid = new MySqlGtid("", uuid, 11L);
+
+ assertEquals(gtid.getTag(), "");
+ assertEquals(gtid.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:11");
+ }
+
+ @Test
+ public void testConstructorWithNullTag() {
+ // Test that null tag works correctly
+ final UUID uuid = UUID.fromString("24bc7850-2c16-11e6-a073-0242ac110002");
+ final MySqlGtid gtid = new MySqlGtid(null, uuid, 11L);
+
+ assertNull(gtid.getTag());
+ assertEquals(gtid.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:11");
+ }
+
+ @Test
+ public void testParseLegacyFormatWithLargeTransactionId() {
+ // Test with large transaction ID
+ final MySqlGtid gtid = MySqlGtid.fromString("24bc7850-2c16-11e6-a073-0242ac110002:9223372036854775807");
+
+ assertNull(gtid.getTag());
+ assertEquals(gtid.getServerId().toString(), "24bc7850-2c16-11e6-a073-0242ac110002");
+ assertEquals(gtid.getTransactionId(), Long.MAX_VALUE);
+ }
+
+ @Test
+ public void testParseTaggedFormatWithLargeTransactionId() {
+ // Test tagged format with large transaction ID
+ final MySqlGtid gtid = MySqlGtid.fromString("24bc7850-2c16-11e6-a073-0242ac110002:tag:9223372036854775807");
+
+ assertEquals(gtid.getTag(), "tag");
+ assertEquals(gtid.getServerId().toString(), "24bc7850-2c16-11e6-a073-0242ac110002");
+ assertEquals(gtid.getTransactionId(), Long.MAX_VALUE);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testParseInvalidFormatTooFewParts() {
+ // Test error handling for invalid format (only UUID, no transaction ID)
+ MySqlGtid.fromString("24bc7850-2c16-11e6-a073-0242ac110002");
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testParseInvalidFormatTooManyParts() {
+ // Test error handling for too many parts
+ MySqlGtid.fromString("tag:uuid:123:extra");
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testParseInvalidUuidInLegacyFormat() {
+ // Test error handling for invalid UUID in legacy format
+ MySqlGtid.fromString("invalid-uuid:11");
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testParseInvalidUuidInTaggedFormat() {
+ // Test error handling for invalid UUID in tagged format
+ MySqlGtid.fromString("invalid-uuid:mytag:11");
+ }
+
+ @Test(expectedExceptions = NumberFormatException.class)
+ public void testParseInvalidTransactionIdInLegacyFormat() {
+ // Test error handling for invalid transaction ID in legacy format
+ MySqlGtid.fromString("24bc7850-2c16-11e6-a073-0242ac110002:notanumber");
+ }
+
+ @Test(expectedExceptions = NumberFormatException.class)
+ public void testParseInvalidTransactionIdInTaggedFormat() {
+ // Test error handling for invalid transaction ID in tagged format
+ MySqlGtid.fromString("24bc7850-2c16-11e6-a073-0242ac110002:mytag:notanumber");
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testParseEmptyString() {
+ // Test error handling for empty string
+ MySqlGtid.fromString("");
+ }
+
+ @Test
+ public void testRoundTripLegacyFormat() {
+ // Test that parsing and toString are inverse operations for legacy format
+ final String original = "24bc7850-2c16-11e6-a073-0242ac110002:11";
+ final MySqlGtid gtid = MySqlGtid.fromString(original);
+ assertEquals(gtid.toString(), original);
+ }
+
+ @Test
+ public void testRoundTripTaggedFormat() {
+ // Test that parsing and toString are inverse operations for tagged format
+ final String original = "24bc7850-2c16-11e6-a073-0242ac110002:mytag:11";
+ final MySqlGtid gtid = MySqlGtid.fromString(original);
+ assertEquals(gtid.toString(), original);
+ }
+}
diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/GtidTaggedEventDataDeserializerTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/GtidTaggedEventDataDeserializerTest.java
new file mode 100644
index 00000000..8851399e
--- /dev/null
+++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/GtidTaggedEventDataDeserializerTest.java
@@ -0,0 +1,329 @@
+package com.github.shyiko.mysql.binlog.event.deserialization;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+import com.github.shyiko.mysql.binlog.event.GtidTaggedEventData;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import org.testng.annotations.Test;
+
+public class GtidTaggedEventDataDeserializerTest {
+
+ private final GtidTaggedEventDataDeserializer deserializer = new GtidTaggedEventDataDeserializer();
+
+ @Test
+ public void testDeserializeWithTag() throws IOException {
+ final GtidTaggedEventData data = deserialize(eventBuilder()
+ .flags(1)
+ .uuid("aae57b2f-8e44-11ee-a3d6-a036bcda1a41")
+ .gno(4)
+ .tag("test_tag")
+ .lastCommitted(3)
+ .sequenceNumber(4)
+ .immediateCommitTimestamp(1701215692713879L)
+ .transactionLength(0)
+ .immediateServerVersion(999999)
+ .build());
+
+ assertEquals(data.getFlags(), 0x01);
+ assertEquals(data.getMySqlGtid().toString(), "aae57b2f-8e44-11ee-a3d6-a036bcda1a41:test_tag:4");
+ assertEquals(data.getMySqlGtid().getTag(), "test_tag");
+ assertEquals(data.getLastCommitted(), 3);
+ assertEquals(data.getSequenceNumber(), 4);
+ assertEquals(data.getImmediateCommitTimestamp(), 1701215692713879L);
+ assertEquals(data.getOriginalCommitTimestamp(), 1701215692713879L);
+ assertEquals(data.getTransactionLength(), 0);
+ assertEquals(data.getImmediateServerVersion(), 999999);
+ assertEquals(data.getOriginalServerVersion(), 999999);
+ assertEquals(data.getCommitGroupTicket(), 0);
+ }
+
+ @Test
+ public void testDeserializeWithoutTag() throws IOException {
+ final GtidTaggedEventData data = deserialize(eventBuilder()
+ .flags(0)
+ .uuid("994ab859-8ea8-11ee-a568-a036bcda1a41")
+ .gno(3)
+ .tag("")
+ .lastCommitted(2)
+ .sequenceNumber(3)
+ .immediateCommitTimestamp(1701257014433088L)
+ .transactionLength(308)
+ .immediateServerVersion(999999)
+ .build());
+
+ assertEquals(data.getFlags(), 0x00);
+ assertEquals(data.getMySqlGtid().toString(), "994ab859-8ea8-11ee-a568-a036bcda1a41:3");
+ assertNull(data.getMySqlGtid().getTag());
+ assertEquals(data.getLastCommitted(), 2);
+ assertEquals(data.getSequenceNumber(), 3);
+ assertEquals(data.getImmediateCommitTimestamp(), 1701257014433088L);
+ assertEquals(data.getOriginalCommitTimestamp(), 1701257014433088L);
+ assertEquals(data.getTransactionLength(), 308);
+ assertEquals(data.getImmediateServerVersion(), 999999);
+ assertEquals(data.getOriginalServerVersion(), 999999);
+ assertEquals(data.getCommitGroupTicket(), 0);
+ }
+
+ @Test
+ public void testDeserializeWithFullFields() throws IOException {
+ final GtidTaggedEventData data = deserialize(eventBuilder()
+ .flags(0)
+ .uuid("bd9794e0-1d65-11ed-a7e7-0adb305b3a12")
+ .gno(9)
+ .tag("prod")
+ .lastCommitted(7)
+ .sequenceNumber(8)
+ .immediateCommitTimestamp(1699112309893478L)
+ .transactionLength(315)
+ .immediateServerVersion(80100)
+ .commitGroupTicket(5)
+ .build());
+
+ assertEquals(data.getFlags(), 0x00);
+ assertEquals(data.getMySqlGtid().toString(), "bd9794e0-1d65-11ed-a7e7-0adb305b3a12:prod:9");
+ assertEquals(data.getMySqlGtid().getTag(), "prod");
+ assertEquals(data.getLastCommitted(), 7);
+ assertEquals(data.getSequenceNumber(), 8);
+ assertEquals(data.getImmediateCommitTimestamp(), 1699112309893478L);
+ assertEquals(data.getOriginalCommitTimestamp(), 1699112309893478L);
+ assertEquals(data.getTransactionLength(), 315);
+ assertEquals(data.getImmediateServerVersion(), 80100);
+ assertEquals(data.getOriginalServerVersion(), 80100);
+ assertEquals(data.getCommitGroupTicket(), 5);
+ }
+
+ @Test
+ public void testDeserializeWithDifferentOriginalTimestamp() throws IOException {
+ final GtidTaggedEventData data = deserialize(eventBuilder()
+ .flags(1)
+ .uuid("aae57b2f-8e44-11ee-a3d6-a036bcda1a41")
+ .gno(4)
+ .tag("replica")
+ .lastCommitted(3)
+ .sequenceNumber(4)
+ .immediateCommitTimestamp(1701215692713879L)
+ .originalCommitTimestamp(1701215692713878L)
+ .transactionLength(0)
+ .immediateServerVersion(999999)
+ .build());
+
+ assertEquals(data.getFlags(), 0x01);
+ assertEquals(data.getMySqlGtid().toString(), "aae57b2f-8e44-11ee-a3d6-a036bcda1a41:replica:4");
+ assertEquals(data.getMySqlGtid().getTag(), "replica");
+ assertEquals(data.getLastCommitted(), 3);
+ assertEquals(data.getSequenceNumber(), 4);
+ assertEquals(data.getImmediateCommitTimestamp(), 1701215692713879L);
+ assertEquals(data.getOriginalCommitTimestamp(), 1701215692713878L);
+ }
+
+ @Test
+ public void testDeserializeWithDifferentOriginalServerVersion() throws IOException {
+ final GtidTaggedEventData data = deserialize(eventBuilder()
+ .flags(0)
+ .uuid("bd9794e0-1d65-11ed-a7e7-0adb305b3a12")
+ .gno(9)
+ .tag("staging")
+ .lastCommitted(7)
+ .sequenceNumber(8)
+ .immediateCommitTimestamp(1699112309893478L)
+ .transactionLength(315)
+ .immediateServerVersion(80100)
+ .originalServerVersion(80099)
+ .build());
+
+ assertEquals(data.getFlags(), 0x00);
+ assertEquals(data.getMySqlGtid().toString(), "bd9794e0-1d65-11ed-a7e7-0adb305b3a12:staging:9");
+ assertEquals(data.getMySqlGtid().getTag(), "staging");
+ assertEquals(data.getLastCommitted(), 7);
+ assertEquals(data.getSequenceNumber(), 8);
+ assertEquals(data.getImmediateCommitTimestamp(), 1699112309893478L);
+ assertEquals(data.getOriginalCommitTimestamp(), 1699112309893478L);
+ assertEquals(data.getTransactionLength(), 315);
+ assertEquals(data.getImmediateServerVersion(), 80100);
+ assertEquals(data.getOriginalServerVersion(), 80099);
+ }
+
+ private GtidTaggedEventData deserialize(byte[] bytes) throws IOException {
+ return deserializer.deserialize(new ByteArrayInputStream(bytes));
+ }
+
+ private static EventBuilder eventBuilder() {
+ return new EventBuilder();
+ }
+
+ private static final class EventBuilder {
+ private int flags;
+ private UUID uuid;
+ private long gno;
+ private String tag = "";
+ private long lastCommitted;
+ private long sequenceNumber;
+ private long immediateCommitTimestamp;
+ private Long originalCommitTimestamp;
+ private long transactionLength;
+ private int immediateServerVersion;
+ private Integer originalServerVersion;
+ private Long commitGroupTicket;
+
+ private EventBuilder flags(int flags) {
+ this.flags = flags;
+ return this;
+ }
+
+ private EventBuilder uuid(String uuid) {
+ this.uuid = UUID.fromString(uuid);
+ return this;
+ }
+
+ private EventBuilder gno(long gno) {
+ this.gno = gno;
+ return this;
+ }
+
+ private EventBuilder tag(String tag) {
+ this.tag = tag;
+ return this;
+ }
+
+ private EventBuilder lastCommitted(long lastCommitted) {
+ this.lastCommitted = lastCommitted;
+ return this;
+ }
+
+ private EventBuilder sequenceNumber(long sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ return this;
+ }
+
+ private EventBuilder immediateCommitTimestamp(long immediateCommitTimestamp) {
+ this.immediateCommitTimestamp = immediateCommitTimestamp;
+ return this;
+ }
+
+ private EventBuilder originalCommitTimestamp(long originalCommitTimestamp) {
+ this.originalCommitTimestamp = originalCommitTimestamp;
+ return this;
+ }
+
+ private EventBuilder transactionLength(long transactionLength) {
+ this.transactionLength = transactionLength;
+ return this;
+ }
+
+ private EventBuilder immediateServerVersion(int immediateServerVersion) {
+ this.immediateServerVersion = immediateServerVersion;
+ return this;
+ }
+
+ private EventBuilder originalServerVersion(int originalServerVersion) {
+ this.originalServerVersion = originalServerVersion;
+ return this;
+ }
+
+ private EventBuilder commitGroupTicket(long commitGroupTicket) {
+ this.commitGroupTicket = commitGroupTicket;
+ return this;
+ }
+
+ private byte[] build() throws IOException {
+ final java.io.ByteArrayOutputStream fields = new java.io.ByteArrayOutputStream();
+ writeFieldId(fields, 0);
+ writeUnsignedVarLong(fields, flags);
+ writeFieldId(fields, 1);
+ writeUuid(fields, uuid);
+ writeFieldId(fields, 2);
+ writeSignedVarLong(fields, gno);
+ writeFieldId(fields, 3);
+ writeString(fields, tag);
+ writeFieldId(fields, 4);
+ writeSignedVarLong(fields, lastCommitted);
+ writeFieldId(fields, 5);
+ writeSignedVarLong(fields, sequenceNumber);
+ writeFieldId(fields, 6);
+ writeUnsignedVarLong(fields, immediateCommitTimestamp);
+ if (originalCommitTimestamp != null) {
+ writeFieldId(fields, 7);
+ writeUnsignedVarLong(fields, originalCommitTimestamp);
+ }
+ writeFieldId(fields, 8);
+ writeUnsignedVarLong(fields, transactionLength);
+ writeFieldId(fields, 9);
+ writeUnsignedVarLong(fields, immediateServerVersion);
+ if (originalServerVersion != null) {
+ writeFieldId(fields, 10);
+ writeUnsignedVarLong(fields, originalServerVersion);
+ }
+ if (commitGroupTicket != null) {
+ writeFieldId(fields, 11);
+ writeUnsignedVarLong(fields, commitGroupTicket);
+ }
+
+ final int lastNonIgnorableFieldId = commitGroupTicket != null ? 12 : originalServerVersion != null ? 11 : 10;
+ int eventSize = fields.size();
+ int previousEventSize;
+ do {
+ previousEventSize = eventSize;
+ eventSize = fields.size() + sizeUnsignedVarLong(1) + sizeUnsignedVarLong(previousEventSize)
+ + sizeUnsignedVarLong(lastNonIgnorableFieldId);
+ } while (eventSize != previousEventSize);
+
+ final java.io.ByteArrayOutputStream event = new java.io.ByteArrayOutputStream();
+ writeUnsignedVarLong(event, 1);
+ writeUnsignedVarLong(event, eventSize);
+ writeUnsignedVarLong(event, lastNonIgnorableFieldId);
+ fields.writeTo(event);
+ return event.toByteArray();
+ }
+
+ private static void writeFieldId(java.io.ByteArrayOutputStream output, int fieldId) throws IOException {
+ writeUnsignedVarLong(output, fieldId);
+ }
+
+ private static void writeUuid(java.io.ByteArrayOutputStream output, UUID uuid) throws IOException {
+ writeUuidLong(output, uuid.getMostSignificantBits());
+ writeUuidLong(output, uuid.getLeastSignificantBits());
+ }
+
+ private static void writeString(java.io.ByteArrayOutputStream output, String value) throws IOException {
+ final byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
+ writeUnsignedVarLong(output, bytes.length);
+ output.write(bytes);
+ }
+
+ private static void writeSignedVarLong(java.io.ByteArrayOutputStream output, long value) throws IOException {
+ writeUnsignedVarLong(output, value >= 0 ? value << 1 : ((-value - 1) << 1) | 1);
+ }
+
+ private static void writeUnsignedVarLong(java.io.ByteArrayOutputStream output, long value) throws IOException {
+ final int byteCount = sizeUnsignedVarLong(value);
+ output.write((int) (((1L << (byteCount - 1)) - 1) | (value << byteCount)));
+ if (byteCount == 1) {
+ return;
+ }
+ long remaining = value >>> (8 - byteCount + ((byteCount + 7) >> 4));
+ for (int i = 0; i < byteCount - 1; ++i) {
+ output.write((int) (remaining & 0xff));
+ remaining >>>= 8;
+ }
+ }
+
+ private static int sizeUnsignedVarLong(long value) {
+ for (int byteCount = 1; byteCount < 9; ++byteCount) {
+ if ((value >>> (7 * byteCount)) == 0) {
+ return byteCount;
+ }
+ }
+ return 9;
+ }
+
+ private static void writeUuidLong(java.io.ByteArrayOutputStream output, long value) throws IOException {
+ for (int i = 7; i >= 0; --i) {
+ writeUnsignedVarLong(output, (value >>> (i << 3)) & 0xff);
+ }
+ }
+ }
+}
diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java
index 5a2eb616..5d80c9b6 100644
--- a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java
+++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java
@@ -69,7 +69,7 @@ public class JsonBinaryValueIntegrationTest {
private BinaryLogClient client;
private CountDownEventListener eventListener;
- private boolean isMaria = "mariadb".equals(System.getenv("MYSQL_VERSION"));
+ private boolean isMaria = TestDatabaseContainer.getVersion().isMaria;
private TestDatabaseContainer masterContainer;
@@ -607,7 +607,7 @@ public void tearDown() throws Exception {
master.execute(new BinaryLogClientIntegrationTest.Callback() {
@Override
public void execute(Statement statement) throws SQLException {
- statement.execute("drop database json_test");
+ statement.execute("drop database if exists json_test");
}
});
master.close();