diff --git a/.github/workflows/mysql-84.yml b/.github/workflows/mysql-84.yml new file mode 100644 index 00000000..8b8f8f32 --- /dev/null +++ b/.github/workflows/mysql-84.yml @@ -0,0 +1,44 @@ +# +# Copyright Debezium Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This workflow will build a Java project with Maven +# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven + +name: Build and run tests against MySQL 8.4 + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v6 + - uses: ./.github/actions/setup-java + - name: Cache Maven packages + uses: actions/cache@v4 + with: + path: ~/.m2 + key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} + restore-keys: ${{ runner.os }}-m2 + - name: Build with Maven + run: ./mvnw -B install -Pmysql-8.4 -Dgpg.skip=true --file pom.xml diff --git a/pom.xml b/pom.xml index 2e442015..c608ce25 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ io.debezium mysql-binlog-connector-java - 0.40.7 + 0.41.0 mysql-binlog-connector-java MySQL Binary Log connector @@ -241,6 +241,9 @@ **/*IntegrationTest.java + + ${mysql.image} + @@ -312,7 +315,7 @@ mysql-8.0 - container-registry.oracle.com/mysql/community-server:8.0 + mirror.gcr.io/mysql:8.0 @@ -321,6 +324,12 @@ mirror.gcr.io/library/mariadb:10.6 + + mysql-8.4 + + mirror.gcr.io/mysql:8.4 + + diff --git a/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java b/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java index b9c491a4..d722fdac 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java @@ -21,11 +21,11 @@ /** * GTID set as described in GTID - * Concepts of MySQL 5.6 Reference Manual. + * Concepts of MySQL 5.6 Reference Manual, with support for MySQL 8.3+ tagged GTID intervals. * *
  * gtid_set: uuid_set[,uuid_set]...
- * uuid_set: uuid:interval[:interval]...
+ * uuid_set: uuid[:tag]:interval[:interval]...
  * uuid: hhhhhhhh-hhhh-hhhh-hhhh-hhhhhhhhhhhh, h: [0-9|A-F]
  * interval: n[-n], (n >= 1)
  * 
@@ -34,7 +34,7 @@ */ public class GtidSet { - private final Map map = new LinkedHashMap(); + private final Map map = new LinkedHashMap(); public static GtidSet parse(String gtidStr) { if ( MariadbGtidSet.isMariaGtidSet(gtidStr) ) { @@ -45,30 +45,77 @@ public static GtidSet parse(String gtidStr) { } /** * @param gtidSet gtid set comprised of closed intervals (like MySQL's executed_gtid_set). + * Supports legacy UUID intervals (uuid:intervals), MySQL tagged GTID-set intervals + * (uuid:tag:intervals), and the previous tag-first representation (tag:uuid:intervals). */ public GtidSet(String gtidSet) { String[] uuidSets = (gtidSet == null || gtidSet.isEmpty()) ? new String[0] : gtidSet.replace("\n", "").split(","); for (String uuidSet : uuidSets) { - int uuidSeparatorIndex = uuidSet.indexOf(":"); - UUID sourceId = UUID.fromString(uuidSet.substring(0, uuidSeparatorIndex)); - List intervals = new ArrayList(); - String[] rawIntervals = uuidSet.substring(uuidSeparatorIndex + 1).split(":"); - for (String interval : rawIntervals) { - String[] is = interval.split("-"); - long[] split = new long[is.length]; - for (int i = 0, e = is.length; i < e; i++) { - split[i] = Long.parseLong(is[i]); - } - if (split.length == 1) { - split = new long[] {split[0], split[0]}; + final String[] parts = uuidSet.split(":"); + + // MySQL tagged GTID sets identify intervals by TSID (uuid[:tag]). + // Also accept the previous tag-first representation (tag:uuid:intervals). + int uuidIndex = 0; + int intervalsStartIndex = 1; + + // UUID format: 8-4-4-4-12 hex digits with dashes. If the first token is not a UUID, + // treat it as a tag from the previous tag-first representation. + if (parts.length >= 3 && !isValidUuidFormat(parts[0])) { + // Previous tag-first format: tag:uuid:intervals... + uuidIndex = 1; + intervalsStartIndex = 2; + } + + final UUID sourceId = UUID.fromString(parts[uuidIndex]); + String tag = uuidIndex == 1 ? parts[0] : null; + for (int i = intervalsStartIndex; i < parts.length; i++) { + final String part = parts[i]; + if (!isInterval(part)) { + tag = part; + continue; } - intervals.add(new Interval(split[0], split[1])); + addInterval(sourceId, tag, parseInterval(part)); } - map.put(sourceId, new UUIDSet(sourceId, intervals)); } } + private void addInterval(UUID sourceId, String tag, Interval interval) { + final Tsid tsid = new Tsid(sourceId, tag); + final UUIDSet existing = map.get(tsid); + final List intervals = existing == null ? new ArrayList() : + new ArrayList(existing.getIntervals()); + intervals.add(interval); + map.put(tsid, new UUIDSet(sourceId, tag, intervals)); + } + + private static Interval parseInterval(final String interval) { + final String[] is = interval.split("-"); + long[] split = new long[is.length]; + for (int j = 0, e = is.length; j < e; j++) { + split[j] = Long.parseLong(is[j]); + } + if (split.length == 1) { + split = new long[] {split[0], split[0]}; + } + return new Interval(split[0], split[1]); + } + + private static boolean isInterval(final String str) { + return str.matches("[0-9]+(-[0-9]+)?"); + } + + /** + * Checks if a string matches the UUID format (8-4-4-4-12 hex digits). + * + * @param str the string to check + * @return true if the string is a valid UUID format, false otherwise + */ + private static boolean isValidUuidFormat(final String str) { + // UUID format: 8-4-4-4-12 hex digits with dashes + return str.matches("[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}"); + } + /** * Get an immutable collection of the {@link UUIDSet range of GTIDs for a single server}. * @return the {@link UUIDSet GTID ranges for each server}; never null @@ -78,22 +125,32 @@ public Collection getUUIDSets() { } /** - * Find the {@link UUIDSet} for the server with the specified UUID. + * Find the untagged {@link UUIDSet} for the server with the specified UUID. * @param uuid the UUID of the server * @return the {@link UUIDSet} for the identified server, or {@code null} if there are no GTIDs from that server. */ public UUIDSet getUUIDSet(String uuid) { - return map.get(UUID.fromString(uuid)); + return map.get(new Tsid(UUID.fromString(uuid), null)); + } + + /** + * Find the {@link UUIDSet} for the server with the specified UUID and tag. + * @param uuid the UUID of the server + * @param tag the GTID tag, or {@code null} for untagged GTIDs + * @return the {@link UUIDSet} for the identified TSID, or {@code null} if there are no GTIDs for that TSID. + */ + public UUIDSet getUUIDSet(String uuid, String tag) { + return map.get(new Tsid(UUID.fromString(uuid), tag)); } /** * Add or replace the UUIDSet * @param uuidSet UUIDSet to be added - * @return the old {@link UUIDSet} for the server given in uuidSet param, - * or {@code null} if there are no UUIDSet for the given server. + * @return the old {@link UUIDSet} for the TSID given in uuidSet param, + * or {@code null} if there are no UUIDSet for the given TSID. */ public UUIDSet putUUIDSet(UUIDSet uuidSet) { - return map.put(uuidSet.getServerId(), uuidSet); + return map.put(uuidSet.getTsid(), uuidSet); } /** @@ -115,9 +172,11 @@ public void addGtid(Object gtid) { } private boolean add(MySqlGtid mySqlGtid) { - UUIDSet uuidSet = map.get(mySqlGtid.getServerId()); + final Tsid tsid = new Tsid(mySqlGtid.getServerId(), mySqlGtid.getTag()); + UUIDSet uuidSet = map.get(tsid); if (uuidSet == null) { - map.put(mySqlGtid.getServerId(), uuidSet = new UUIDSet(mySqlGtid.getServerId(), new ArrayList())); + map.put(tsid, uuidSet = new UUIDSet(mySqlGtid.getServerId(), mySqlGtid.getTag(), + new ArrayList())); } return uuidSet.add(mySqlGtid.getTransactionId()); } @@ -140,7 +199,7 @@ public boolean isContainedWithin(GtidSet other) { return true; } for (UUIDSet uuidSet : map.values()) { - UUIDSet thatSet = other.map.get(uuidSet.getServerId()); + UUIDSet thatSet = other.map.get(uuidSet.getTsid()); if (!uuidSet.isContainedWithin(thatSet)) { return false; } @@ -168,8 +227,18 @@ public boolean equals(Object obj) { @Override public String toString() { List gtids = new ArrayList(); - for (UUIDSet uuidSet : map.values()) { - gtids.add(uuidSet.getServerId() + ":" + join(uuidSet.intervals, ":")); + for (Map.Entry> entry : getUUIDSetGroups().entrySet()) { + StringBuilder sb = new StringBuilder(); + sb.append(entry.getKey()).append(':'); + Iterator iter = entry.getValue().iterator(); + if (iter.hasNext()) { + appendTaggedIntervals(sb, iter.next()); + } + while (iter.hasNext()) { + sb.append(':'); + appendTaggedIntervals(sb, iter.next()); + } + gtids.add(sb.toString()); } return join(gtids, ","); } @@ -189,6 +258,54 @@ private static String join(Collection o, String delimiter) { return sb.substring(0, sb.length() - delimiter.length()); } + private Map> getUUIDSetGroups() { + Map> groups = new LinkedHashMap>(); + for (UUIDSet uuidSet : map.values()) { + List uuidSets = groups.get(uuidSet.getServerId()); + if (uuidSets == null) { + uuidSets = new ArrayList(); + groups.put(uuidSet.getServerId(), uuidSets); + } + uuidSets.add(uuidSet); + } + return groups; + } + + private static void appendTaggedIntervals(StringBuilder sb, UUIDSet uuidSet) { + if (uuidSet.getTag() != null) { + sb.append(uuidSet.getTag()).append(':'); + } + sb.append(join(uuidSet.intervals, ":")); + } + + private static final class Tsid { + + private final UUID uuid; + private final String tag; + + private Tsid(UUID uuid, String tag) { + this.uuid = uuid; + this.tag = tag == null || tag.isEmpty() ? null : tag; + } + + @Override + public int hashCode() { + return 31 * uuid.hashCode() + (tag == null ? 0 : tag.hashCode()); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof Tsid) { + Tsid that = (Tsid) obj; + return this.uuid.equals(that.uuid) && Objects.equals(this.tag, that.tag); + } + return false; + } + } + /** * A range of GTIDs for a single server with a specific UUID. * @see GtidSet @@ -196,6 +313,7 @@ private static String join(Collection o, String delimiter) { public static final class UUIDSet { private final UUID uuid; + private final String tag; private final List intervals; public UUIDSet(String uuid, List intervals) { @@ -203,10 +321,20 @@ public UUIDSet(String uuid, List intervals) { } public UUIDSet(UUID uuid, List intervals) { + this(uuid, null, intervals); + } + + public UUIDSet(String uuid, String tag, List intervals) { + this(UUID.fromString(uuid), tag, intervals); + } + + public UUIDSet(UUID uuid, String tag, List intervals) { this.uuid = uuid; + this.tag = tag == null || tag.isEmpty() ? null : tag; this.intervals = intervals; if (intervals.size() > 1) { - joinAdjacentIntervals(0); + Collections.sort(intervals); + joinAdjacentIntervals(); } } @@ -237,13 +365,23 @@ private boolean add(long transactionId) { } /** - * Collapses intervals like a-(b-1):b-c into a-c (only in index+-1 range). + * Collapses adjacent or overlapping intervals near the supplied index. */ private void joinAdjacentIntervals(int index) { for (int i = Math.min(index + 1, intervals.size() - 1), e = Math.max(index - 1, 0); i > e; i--) { Interval a = intervals.get(i - 1), b = intervals.get(i); - if (a.end + 1 == b.start) { - a.end = b.end; + if (a.end + 1 >= b.start) { + a.end = Math.max(a.end, b.end); + intervals.remove(i); + } + } + } + + private void joinAdjacentIntervals() { + for (int i = intervals.size() - 1; i > 0; i--) { + Interval a = intervals.get(i - 1), b = intervals.get(i); + if (a.end + 1 >= b.start) { + a.end = Math.max(a.end, b.end); intervals.remove(i); } } @@ -285,6 +423,14 @@ public UUID getServerId() { return uuid; } + public String getTag() { + return tag; + } + + private Tsid getTsid() { + return new Tsid(uuid, tag); + } + /** * Get the intervals of transaction numbers. @@ -309,6 +455,9 @@ public boolean isContainedWithin(UUIDSet other) { // not even the same server ... return false; } + if (!Objects.equals(this.tag, other.tag)) { + return false; + } if (this.intervals.isEmpty()) { return true; } @@ -333,7 +482,7 @@ public boolean isContainedWithin(UUIDSet other) { @Override public int hashCode() { - return uuid.hashCode(); + return 31 * uuid.hashCode() + (tag == null ? 0 : tag.hashCode()); } @Override @@ -344,6 +493,7 @@ public boolean equals(Object obj) { if (obj instanceof UUIDSet) { UUIDSet that = (UUIDSet) obj; return this.uuid.equals(that.uuid) && + Objects.equals(this.tag, that.tag) && this.getIntervals().equals(that.getIntervals()); } return super.equals(obj); @@ -356,6 +506,9 @@ public String toString() { sb.append(','); } sb.append(uuid).append(':'); + if (tag != null) { + sb.append(tag).append(':'); + } Iterator iter = intervals.iterator(); if (iter.hasNext()) { sb.append(iter.next()); diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java b/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java index 0525d1ff..f34243b7 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/EventType.java @@ -208,6 +208,17 @@ public enum EventType { */ TRANSACTION_PAYLOAD(40), + /** + * Heartbeat event version 2. + */ + HEARTBEAT_V2(41), + + /** + * Global Transaction Identifier with tag support (MySQL 8.3+). + * Similar to GTID but includes an optional tag field for organizational purposes. + */ + GTID_TAGGED(42), + /** * MariaDB Support Events * diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/GtidTaggedEventData.java b/src/main/java/com/github/shyiko/mysql/binlog/event/GtidTaggedEventData.java new file mode 100644 index 00000000..6502b1c3 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/GtidTaggedEventData.java @@ -0,0 +1,138 @@ +/* + * Copyright 2013 Patrick Prasse + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog.event; + +/** + * GTID_TAGGED_LOG_EVENT (MySQL 8.3+) - Global Transaction Identifier with tag support. + * Similar to GTID_LOG_EVENT but includes an optional tag field for organizational purposes. + * + * @author Patrick Prasse + */ +public class GtidTaggedEventData implements EventData { + + public static final byte COMMIT_FLAG = 1; + + private MySqlGtid gtid; + private byte flags; + private long lastCommitted; + private long sequenceNumber; + private long immediateCommitTimestamp; + private long originalCommitTimestamp; + private long transactionLength; + private int immediateServerVersion; + private int originalServerVersion; + private long commitGroupTicket; + + @Deprecated + public GtidTaggedEventData() { + } + + public GtidTaggedEventData(MySqlGtid gtid, byte flags, long lastCommitted, long sequenceNumber, + long immediateCommitTimestamp, long originalCommitTimestamp, + long transactionLength, int immediateServerVersion, + int originalServerVersion, long commitGroupTicket) { + this.gtid = gtid; + this.flags = flags; + this.lastCommitted = lastCommitted; + this.sequenceNumber = sequenceNumber; + this.immediateCommitTimestamp = immediateCommitTimestamp; + this.originalCommitTimestamp = originalCommitTimestamp; + this.transactionLength = transactionLength; + this.immediateServerVersion = immediateServerVersion; + this.originalServerVersion = originalServerVersion; + this.commitGroupTicket = commitGroupTicket; + } + + @Deprecated + public String getGtid() { + return gtid.toString(); + } + + @Deprecated + public void setGtid(String gtid) { + this.gtid = MySqlGtid.fromString(gtid); + } + + public MySqlGtid getMySqlGtid() { + return gtid; + } + + public byte getFlags() { + return flags; + } + + public long getLastCommitted() { + return lastCommitted; + } + + public long getSequenceNumber() { + return sequenceNumber; + } + + public long getImmediateCommitTimestamp() { + return immediateCommitTimestamp; + } + + public long getOriginalCommitTimestamp() { + return originalCommitTimestamp; + } + + public long getTransactionLength() { + return transactionLength; + } + + public int getImmediateServerVersion() { + return immediateServerVersion; + } + + public int getOriginalServerVersion() { + return originalServerVersion; + } + + public long getCommitGroupTicket() { + return commitGroupTicket; + } + + @Deprecated + public void setFlags(byte flags) { + this.flags = flags; + } + + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append("GtidTaggedEventData"); + sb.append("{flags=").append(flags).append(", gtid='").append(gtid).append('\''); + sb.append(", last_committed='").append(lastCommitted).append('\''); + sb.append(", sequence_number='").append(sequenceNumber).append('\''); + if (immediateCommitTimestamp != 0) { + sb.append(", immediate_commit_timestamp='").append(immediateCommitTimestamp).append('\''); + sb.append(", original_commit_timestamp='").append(originalCommitTimestamp).append('\''); + } + if (transactionLength != 0) { + sb.append(", transaction_length='").append(transactionLength).append('\''); + if (immediateServerVersion != 0) { + sb.append(", immediate_server_version='").append(immediateServerVersion).append('\''); + sb.append(", original_server_version='").append(originalServerVersion).append('\''); + } + } + if (commitGroupTicket != 0) { + sb.append(", commit_group_ticket='").append(commitGroupTicket).append('\''); + } + sb.append('}'); + return sb.toString(); + } + +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/MySqlGtid.java b/src/main/java/com/github/shyiko/mysql/binlog/event/MySqlGtid.java index c7fc3f2c..bb4929ef 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/MySqlGtid.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/MySqlGtid.java @@ -2,25 +2,107 @@ import java.util.UUID; +/** + * Represents a MySQL GTID (Global Transaction Identifier). + * Supports both legacy format (uuid:transaction_id) and MySQL 8.3+ tagged format (uuid:tag:transaction_id). + */ public class MySqlGtid { + public static final int TAG_MAX_LENGTH = 32; + + private final String tag; private final UUID serverId; private final long transactionId; - public MySqlGtid(UUID serverId, long transactionId) { + /** + * Creates a MySqlGtid without a tag (legacy format). + * + * @param serverId the server UUID + * @param transactionId the transaction ID + */ + public MySqlGtid(final UUID serverId, final long transactionId) { + this(null, serverId, transactionId); + } + + /** + * Creates a MySqlGtid with an optional tag (MySQL 8.3+ format). + * + * @param tag the optional tag (null or empty for legacy format) + * @param serverId the server UUID + * @param transactionId the transaction ID + */ + public MySqlGtid(final String tag, final UUID serverId, final long transactionId) { + this.tag = tag; this.serverId = serverId; this.transactionId = transactionId; } - public static MySqlGtid fromString(String gtid) { - String[] split = gtid.split(":"); - String sourceId = split[0]; - long transactionId = Long.parseLong(split[1]); - return new MySqlGtid(UUID.fromString(sourceId), transactionId); + /** + * Parses a GTID string in either legacy or tagged format. + *

+ * Supported formats: + *

    + *
  • Legacy: uuid:transaction_id
  • + *
  • Tagged (MySQL 8.3+): uuid:tag:transaction_id
  • + *
+ * + * @param gtid the GTID string to parse + * @return the parsed MySqlGtid + * @throws IllegalArgumentException if the format is invalid + */ + public static MySqlGtid fromString(final String gtid) { + final String[] split = gtid.split(":"); + + if (split.length == 2) { + // Legacy format: uuid:transaction_id + final String sourceId = split[0]; + final long transactionId = Long.parseLong(split[1]); + return new MySqlGtid(UUID.fromString(sourceId), transactionId); + } + else if (split.length == 3) { + final String sourceId; + final String tag; + if (isValidUuidFormat(split[0])) { + // Tagged format: uuid:tag:transaction_id + sourceId = split[0]; + tag = split[1]; + } + else { + // Backward compatibility for the previous tag-first representation. + tag = split[0]; + sourceId = split[1]; + } + if (tag.length() > TAG_MAX_LENGTH) { + throw new IllegalArgumentException( + "Invalid GTID format: tag length " + tag.length() + + " exceeds maximum " + TAG_MAX_LENGTH + ); + } + final long transactionId = Long.parseLong(split[2]); + return new MySqlGtid(tag, UUID.fromString(sourceId), transactionId); + } + else { + throw new IllegalArgumentException( + "Invalid GTID format: " + gtid + + ". Expected format: 'uuid:transaction_id' or 'uuid:tag:transaction_id'" + ); + } } @Override public String toString() { - return serverId.toString()+":"+transactionId; + if (tag != null && !tag.isEmpty()) { + return serverId.toString() + ":" + tag + ":" + transactionId; + } + return serverId.toString() + ":" + transactionId; + } + + /** + * Gets the optional tag (MySQL 8.3+). + * + * @return the tag, or null if not present + */ + public String getTag() { + return tag; } public UUID getServerId() { @@ -30,4 +112,8 @@ public UUID getServerId() { public long getTransactionId() { return transactionId; } -} + + private static boolean isValidUuidFormat(final String str) { + return str.matches("[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}"); + } +} \ No newline at end of file diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java index 6ef2616d..1b3b4776 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java @@ -116,6 +116,8 @@ private void registerDefaultEventDataDeserializers() { new RowsQueryEventDataDeserializer()); eventDataDeserializers.put(EventType.GTID, new GtidEventDataDeserializer()); + eventDataDeserializers.put(EventType.GTID_TAGGED, + new GtidTaggedEventDataDeserializer()); eventDataDeserializers.put(EventType.PREVIOUS_GTIDS, new PreviousGtidSetDeserializer()); eventDataDeserializers.put(EventType.XA_PREPARE, diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/GtidTaggedEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/GtidTaggedEventDataDeserializer.java new file mode 100644 index 00000000..8505e619 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/GtidTaggedEventDataDeserializer.java @@ -0,0 +1,197 @@ +/* + * Copyright 2013 Patrick Prasse + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog.event.deserialization; + +import com.github.shyiko.mysql.binlog.event.GtidTaggedEventData; +import com.github.shyiko.mysql.binlog.event.MySqlGtid; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +/** + * Deserializer for GTID_TAGGED_LOG_EVENT (MySQL 8.3+). + * + * @author Patrick Prasse + */ +public class GtidTaggedEventDataDeserializer implements EventDataDeserializer { + private static final int SERIALIZATION_FORMAT_VERSION = 1; + + @Override + public GtidTaggedEventData deserialize(ByteArrayInputStream inputStream) throws IOException { + final Cursor cursor = new Cursor(inputStream.read(inputStream.available())); + final long serializationFormatVersion = cursor.readUnsignedVarLong(); + if (serializationFormatVersion != SERIALIZATION_FORMAT_VERSION) { + throw new IOException("Unexpected GTID_TAGGED serialization format version " + serializationFormatVersion); + } + final int eventEnd = cursor.checkedInt(cursor.readUnsignedVarLong(), "GTID_TAGGED event length"); + cursor.readUnsignedVarLong(); // last non-ignorable field id + + cursor.readFieldId(0); + final byte flags = (byte) cursor.readUnsignedVarLong(); + + cursor.readFieldId(1); + final long sourceIdMostSignificantBits = cursor.readUuidLong(); + final long sourceIdLeastSignificantBits = cursor.readUuidLong(); + + cursor.readFieldId(2); + final long transactionId = cursor.readSignedVarLong(); + + cursor.readFieldId(3); + final String tag = cursor.readString(MySqlGtid.TAG_MAX_LENGTH); + + final MySqlGtid gtid = new MySqlGtid( + tag.isEmpty() ? null : tag, + new UUID(sourceIdMostSignificantBits, sourceIdLeastSignificantBits), + transactionId + ); + + cursor.readFieldId(4); + final long lastCommitted = cursor.readSignedVarLong(); + + cursor.readFieldId(5); + final long sequenceNumber = cursor.readSignedVarLong(); + + cursor.readFieldId(6); + final long immediateCommitTimestamp = cursor.readUnsignedVarLong(); + + final long originalCommitTimestamp; + if (cursor.nextFieldId(eventEnd) == 7) { + cursor.readFieldId(7); + originalCommitTimestamp = cursor.readUnsignedVarLong(); + } else { + originalCommitTimestamp = immediateCommitTimestamp; + } + + cursor.readFieldId(8); + final long transactionLength = cursor.readUnsignedVarLong(); + + cursor.readFieldId(9); + final int immediateServerVersion = cursor.checkedInt(cursor.readUnsignedVarLong(), "immediate server version"); + + final int originalServerVersion; + if (cursor.nextFieldId(eventEnd) == 10) { + cursor.readFieldId(10); + originalServerVersion = cursor.checkedInt(cursor.readUnsignedVarLong(), "original server version"); + } else { + originalServerVersion = immediateServerVersion; + } + + long commitGroupTicket = 0; + if (cursor.nextFieldId(eventEnd) == 11) { + cursor.readFieldId(11); + commitGroupTicket = cursor.readUnsignedVarLong(); + } + + return new GtidTaggedEventData(gtid, flags, lastCommitted, sequenceNumber, + immediateCommitTimestamp, originalCommitTimestamp, transactionLength, + immediateServerVersion, originalServerVersion, commitGroupTicket); + } + + private static final class Cursor { + private final byte[] bytes; + private int position; + + private Cursor(byte[] bytes) { + this.bytes = bytes; + } + + private void readFieldId(int expectedFieldId) throws IOException { + final long fieldId = readUnsignedVarLong(); + if (fieldId != expectedFieldId) { + throw new IOException("Expected GTID_TAGGED field " + expectedFieldId + " but found " + fieldId); + } + } + + private int nextFieldId(int eventEnd) throws IOException { + if (position >= Math.min(eventEnd, bytes.length)) { + return -1; + } + final int savedPosition = position; + final long fieldId = readUnsignedVarLong(); + position = savedPosition; + return checkedInt(fieldId, "field id"); + } + + private long readSignedVarLong() throws IOException { + final long value = readUnsignedVarLong(); + return (value & 1) == 0 ? value >>> 1 : -(value >>> 1) - 1; + } + + private long readUnsignedVarLong() throws IOException { + final int firstByte = readByte(); + int byteCount = 1; + while (byteCount < 9 && ((firstByte >>> (byteCount - 1)) & 1) == 1) { + byteCount++; + } + if (position + byteCount - 1 > bytes.length) { + throw new IOException("Unexpected end of GTID_TAGGED variable-length integer"); + } + + long result = (firstByte & 0xffL) >>> byteCount; + if (byteCount == 1) { + return result; + } + + long remaining = 0; + for (int i = 0; i < byteCount - 1; ++i) { + remaining |= (long) readByte() << (i << 3); + } + final int shift = 8 - byteCount + ((byteCount + 7) >> 4); + return result | (remaining << shift); + } + + private long readUuidLong() throws IOException { + long result = 0; + for (int i = 0; i < 8; ++i) { + final long value = readUnsignedVarLong(); + if (value > 0xff) { + throw new IOException("GTID_TAGGED UUID byte exceeds byte range: " + value); + } + result = (result << 8) | value; + } + return result; + } + + private String readString(int maxLength) throws IOException { + final int length = checkedInt(readUnsignedVarLong(), "tag length"); + if (length > maxLength) { + throw new IOException("GTID tag length " + length + " exceeds maximum " + maxLength); + } + if (position + length > bytes.length) { + throw new IOException("Unexpected end of GTID tag"); + } + final String result = new String(bytes, position, length, StandardCharsets.UTF_8); + position += length; + return result; + } + + private int checkedInt(long value, String name) throws IOException { + if (value > Integer.MAX_VALUE) { + throw new IOException("GTID_TAGGED " + name + " exceeds integer range: " + value); + } + return (int) value; + } + + private int readByte() throws IOException { + if (position >= bytes.length) { + throw new IOException("Unexpected end of GTID_TAGGED event"); + } + return bytes[position++] & 0xff; + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/PreviousGtidSetDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/PreviousGtidSetDeserializer.java index ca1fee73..43b176a8 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/PreviousGtidSetDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/PreviousGtidSetDeserializer.java @@ -18,6 +18,8 @@ import static java.lang.String.format; import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; import com.github.shyiko.mysql.binlog.event.PreviousGtidSetEventData; import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; @@ -42,17 +44,14 @@ public PreviousGtidSetEventData deserialize( nUuidsMask = 0x00ffffffffffff00L; nUuids = (int) ((nUuidsEncoded & nUuidsMask) >> 8); } - String[] gtids = new String[nUuids]; + Map gtids = new LinkedHashMap(); for (int i = 0; i < nUuids; i++) { String uuid = formatUUID(inputStream.read(16)); + String tag = null; if (formatEncoded==TAGGED_GTID) { int len = inputStream.readInteger(1); - String tag = inputStream.readString(len >> 1); - - if (!tag.isEmpty()) { - uuid += ":" + tag; - } + tag = inputStream.readString(len >> 1); } int nIntervals = inputStream.readInteger(8); @@ -63,9 +62,18 @@ public PreviousGtidSetEventData deserialize( intervals[j] = start + "-" + (end - 1); } - gtids[i] = format("%s:%s", uuid, join(intervals, ":")); + String uuidSet = gtids.get(uuid); + if (uuidSet == null) { + uuidSet = uuid; + } + if (tag != null && !tag.isEmpty()) { + uuidSet = format("%s:%s:%s", uuidSet, tag, join(intervals, ":")); + } else { + uuidSet = format("%s:%s", uuidSet, join(intervals, ":")); + } + gtids.put(uuid, uuidSet); } - return new PreviousGtidSetEventData(join(gtids, ",")); + return new PreviousGtidSetEventData(join(gtids.values().toArray(new String[0]), ",")); } private String formatUUID(byte[] bytes) { diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogGtidCommand.java b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogGtidCommand.java index 6f28eb48..a0efd14d 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogGtidCommand.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogGtidCommand.java @@ -48,15 +48,23 @@ public byte[] toByteArray() throws IOException { buffer.writeString(this.binlogFilename); buffer.writeLong(this.binlogPosition, 8); Collection uuidSets = gtidSet.getUUIDSets(); + boolean taggedFormat = containsTaggedGtid(uuidSets); int dataSize = 8 /* number of uuidSets */; for (GtidSet.UUIDSet uuidSet : uuidSets) { - dataSize += 16 /* uuid */ + 8 /* number of intervals */ + + dataSize += 16 /* uuid */ + (taggedFormat ? 1 + getTagLength(uuidSet) : 0) + 8 /* number of intervals */ + uuidSet.getIntervals().size() /* number of intervals */ * 16 /* start-end */; } buffer.writeInteger(dataSize, 4); - buffer.writeLong(uuidSets.size(), 8); + buffer.writeLong(getEncodedUuidSetCount(uuidSets.size(), taggedFormat), 8); for (GtidSet.UUIDSet uuidSet : uuidSets) { buffer.write(hexToByteArray(uuidSet.getUUID().replace("-", ""))); + if (taggedFormat) { + String tag = uuidSet.getTag(); + buffer.writeInteger(getTagLength(uuidSet) << 1, 1); + if (tag != null) { + buffer.writeString(tag); + } + } Collection intervals = uuidSet.getIntervals(); buffer.writeLong(intervals.size(), 8); for (GtidSet.Interval interval : intervals) { @@ -67,6 +75,27 @@ public byte[] toByteArray() throws IOException { return buffer.toByteArray(); } + private static boolean containsTaggedGtid(Collection uuidSets) { + for (GtidSet.UUIDSet uuidSet : uuidSets) { + if (uuidSet.getTag() != null && !uuidSet.getTag().isEmpty()) { + return true; + } + } + return false; + } + + private static int getTagLength(GtidSet.UUIDSet uuidSet) { + String tag = uuidSet.getTag(); + return tag == null ? 0 : tag.length(); + } + + private static long getEncodedUuidSetCount(int uuidSetCount, boolean taggedFormat) { + if (!taggedFormat) { + return uuidSetCount; + } + return (1L << 56) | ((long) uuidSetCount << 8) | 1L; + } + private static byte[] hexToByteArray(String uuid) { byte[] b = new byte[uuid.length() / 2]; for (int i = 0, j = 0; j < uuid.length(); j += 2) { diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientBinlogCompressIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientBinlogCompressIntegrationTest.java index 46b44e08..0f40d364 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientBinlogCompressIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientBinlogCompressIntegrationTest.java @@ -3,6 +3,8 @@ import org.testng.SkipException; import org.testng.annotations.Test; +import com.github.shyiko.mysql.binlog.network.SSLMode; + /** * @author vjuranek */ diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDIntegrationTest.java index ab06769a..cb4cf187 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDIntegrationTest.java @@ -48,13 +48,13 @@ protected TestDatabaseContainerOptions getOptions() { public void testGTIDAdvancesStatementBased() throws Exception { try { master.execute("set global binlog_format=statement"); - slave.execute("stop slave", "set global binlog_format=statement", "start slave"); + slave.execute("stop replica", "set global binlog_format=statement", "start replica"); master.reconnect(); master.execute("use test"); testGTIDAdvances(); } finally { master.execute("set global binlog_format=row"); - slave.execute("stop slave", "set global binlog_format=row", "start slave"); + slave.execute("stop replica", "set global binlog_format=row", "start replica"); master.reconnect(); master.execute("use test"); } @@ -197,7 +197,12 @@ public void execute(Statement statement) throws SQLException { private String getExecutedGtidSet(MySQLConnection master) throws SQLException { final String[] initialGTIDSet = new String[1]; - master.query("show master status", new Callback() { + // MySQL 8.4+ renamed SHOW MASTER STATUS to SHOW BINARY LOG STATUS + String version = System.getProperty("mysql.image", "mysql:8.0"); + boolean isMySQL84Plus = version.contains("8.4") || version.contains("8.5") || version.contains("9."); + String statusQuery = isMySQL84Plus ? "SHOW BINARY LOG STATUS" : "SHOW MASTER STATUS"; + + master.query(statusQuery, new Callback() { @Override public void execute(ResultSet rs) throws SQLException { rs.next(); diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDTagIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDTagIntegrationTest.java new file mode 100644 index 00000000..ddc8547a --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientGTIDTagIntegrationTest.java @@ -0,0 +1,237 @@ +/* + * Copyright 2024 Debezium Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog; + +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.GtidEventData; +import com.github.shyiko.mysql.binlog.event.GtidTaggedEventData; +import com.github.shyiko.mysql.binlog.event.MySqlGtid; +import com.github.shyiko.mysql.binlog.event.TableMapEventData; +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; +import org.testng.SkipException; +import org.testng.annotations.Test; + +import java.sql.Statement; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +/** + * Integration tests for MySQL 8.3+ GTID tag support using SET gtid_next. + * + * @author Debezium Authors + */ +public class BinaryLogClientGTIDTagIntegrationTest extends BinaryLogClientIntegrationTest { + + private static final String TEST_TAG = "testtag"; + + @Override + protected TestDatabaseContainerOptions getOptions() { + if (!this.mysqlVersion.atLeast(8, 3)) { + throw new SkipException("GTID tags require MySQL 8.3+"); + } + + TestDatabaseContainerOptions options = new TestDatabaseContainerOptions(); + options.gtid = true; + return options; + } + + @Test + public void testGtidTagWithSetGtidNext() throws Exception { + master.execute("CREATE TABLE if not exists gtid_tag_test (id int primary key, value varchar(50))"); + + final String initialGTIDSet = getExecutedGtidSet(); + assertNotNull(initialGTIDSet, "Initial GTID set is null"); + + final AtomicReference capturedTag = new AtomicReference<>(); + final AtomicReference capturedGtid = new AtomicReference<>(); + + try { + client.disconnect(); + final BinaryLogClient taggedClient = new BinaryLogClient(master.hostname(), master.port(), + master.username(), master.password()); + + taggedClient.setGtidSet(initialGTIDSet); + taggedClient.registerEventListener(new BinaryLogClient.EventListener() { + @Override + public void onEvent(Event event) { + final EventType eventType = event.getHeader().getEventType(); + if (eventType == EventType.GTID) { + final GtidEventData gtidData = (GtidEventData) event.getData(); + final MySqlGtid gtid = gtidData.getMySqlGtid(); + capturedGtid.set(gtid); + capturedTag.set(gtid.getTag()); + } else if (eventType == EventType.GTID_TAGGED) { + final GtidTaggedEventData gtidData = (GtidTaggedEventData) event.getData(); + final MySqlGtid gtid = gtidData.getMySqlGtid(); + capturedGtid.set(gtid); + capturedTag.set(gtid.getTag()); + } + } + }); + taggedClient.registerEventListener(new TraceEventListener()); + taggedClient.registerEventListener(eventListener); + + try { + eventListener.reset(); + taggedClient.connect(DEFAULT_TIMEOUT); + + // Execute a transaction with a tagged GTID using SET gtid_next. + master.execute(new Callback() { + @Override + public void execute(Statement statement) throws java.sql.SQLException { + statement.execute("SET SESSION gtid_next = 'AUTOMATIC:" + TEST_TAG + "'"); + statement.execute("INSERT INTO gtid_tag_test (id, value) VALUES (1, 'tagged')"); + } + }); + master.execute("SET SESSION gtid_next = 'AUTOMATIC'"); + + // Wait for the tagged GTID event. + eventListener.waitForAtLeast(EventType.GTID_TAGGED, 1, TimeUnit.SECONDS.toMillis(4)); + + // Verify the tag was captured + assertNotNull(capturedTag.get(), "GTID tag was not captured"); + assertEquals(capturedTag.get(), TEST_TAG, "GTID tag does not match expected value"); + + // Verify the full GTID includes the tag + assertNotNull(capturedGtid.get(), "GTID was not captured"); + String gtidString = capturedGtid.get().toString(); + assertNotNull(gtidString, "GTID toString returned null"); + assertEquals(gtidString.split(":")[1], TEST_TAG, "GTID string does not include expected tag"); + + } finally { + taggedClient.disconnect(); + } + } finally { + client.connect(DEFAULT_TIMEOUT); + master.execute("DROP TABLE IF EXISTS gtid_tag_test"); + } + } + + @Test + public void testMixedTaggedAndNonTaggedGtids() throws Exception { + master.execute("CREATE TABLE if not exists mixed_gtid_test (id int primary key, value varchar(50))"); + + final String initialGTIDSet = getExecutedGtidSet(); + assertNotNull(initialGTIDSet, "Initial GTID set is null"); + + final AtomicReference firstTag = new AtomicReference<>(); + final AtomicReference secondTag = new AtomicReference<>(); + final CountDownLatch mixedRows = new CountDownLatch(2); + + try { + client.disconnect(); + final BinaryLogClient taggedClient = new BinaryLogClient(master.hostname(), master.port(), + master.username(), master.password()); + + taggedClient.setGtidSet(initialGTIDSet); + + taggedClient.registerEventListener(new BinaryLogClient.EventListener() { + private final AtomicReference transactionTag = new AtomicReference<>(); + private long mixedTableId = -1; + + @Override + public void onEvent(Event event) { + final EventType eventType = event.getHeader().getEventType(); + if (eventType == EventType.GTID) { + final GtidEventData gtidData = (GtidEventData) event.getData(); + transactionTag.set(gtidData.getMySqlGtid().getTag()); + } else if (eventType == EventType.GTID_TAGGED) { + final GtidTaggedEventData gtidData = (GtidTaggedEventData) event.getData(); + transactionTag.set(gtidData.getMySqlGtid().getTag()); + } else if (eventType == EventType.TABLE_MAP) { + final TableMapEventData tableMap = (TableMapEventData) event.getData(); + if ("mixed_gtid_test".equals(tableMap.getTable())) { + mixedTableId = tableMap.getTableId(); + } + } else if (EventType.isWrite(eventType)) { + final WriteRowsEventData writeRows = (WriteRowsEventData) event.getData(); + if (writeRows.getTableId() == mixedTableId) { + for (Object[] row : writeRows.getRows()) { + if (Integer.valueOf(1).equals(row[0])) { + firstTag.set(transactionTag.get()); + mixedRows.countDown(); + } else if (Integer.valueOf(2).equals(row[0])) { + secondTag.set(transactionTag.get()); + mixedRows.countDown(); + } + } + } + } + } + }); + taggedClient.registerEventListener(eventListener); + + try { + eventListener.reset(); + taggedClient.connect(DEFAULT_TIMEOUT); + + // Execute one tagged transaction and one untagged transaction. + master.execute(new Callback() { + @Override + public void execute(Statement statement) throws java.sql.SQLException { + statement.execute("SET SESSION gtid_next = 'AUTOMATIC:" + TEST_TAG + "'"); + statement.execute("INSERT INTO mixed_gtid_test (id, value) VALUES (1, 'tagged')"); + } + }); + master.execute("SET SESSION gtid_next = 'AUTOMATIC'"); + master.execute("INSERT INTO mixed_gtid_test (id, value) VALUES (2, 'not-tagged')"); + + // Wait for both GTID event types and the rows that belong to this test. + eventListener.waitForAtLeast(EventType.GTID_TAGGED, 1, TimeUnit.SECONDS.toMillis(4)); + eventListener.waitForAtLeast(EventType.GTID, 1, TimeUnit.SECONDS.toMillis(4)); + assertTrue(mixedRows.await(4, TimeUnit.SECONDS), "Timed out waiting for mixed GTID test rows"); + + // Verify the tagged insert was associated with the tag. + assertNotNull(firstTag.get(), "First GTID tag was not captured"); + assertEquals(firstTag.get(), TEST_TAG, "First GTID tag does not match"); + + // Verify the untagged insert was not associated with a tag. + assertNull(secondTag.get(), "Second GTID should not have a tag"); + + } finally { + taggedClient.disconnect(); + } + } finally { + client.connect(DEFAULT_TIMEOUT); + master.execute("DROP TABLE IF EXISTS mixed_gtid_test"); + } + } + + private String getExecutedGtidSet() throws Exception { + final String[] gtidSet = new String[1]; + // MySQL 8.4+ renamed SHOW MASTER STATUS to SHOW BINARY LOG STATUS + String version = System.getProperty("mysql.image", "mysql:8.0"); + boolean isMySQL84Plus = version.contains("8.4") || version.contains("8.5") || version.contains("9."); + String statusQuery = isMySQL84Plus ? "SHOW BINARY LOG STATUS" : "SHOW MASTER STATUS"; + + master.query(statusQuery, new Callback() { + @Override + public void execute(java.sql.ResultSet rs) throws java.sql.SQLException { + if (rs.next()) { + gtidSet[0] = rs.getString("Executed_Gtid_Set"); + } + } + }); + return gtidSet[0]; + } +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java index 80203986..f6e1dd6c 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java @@ -1013,7 +1013,7 @@ public void testMysql8FastAuth() throws Exception { @Test public void testSHA2CachingAuthAsDefault() throws Exception { - if ( !mysqlVersion.atLeast(8, 0) ) + if ( !mysqlVersion.atLeast(8, 0) || !mysqlVersion.lessThan(8, 4) ) throw new SkipException("skipping mysql8 auth test"); TestDatabaseContainerOptions opts = new TestDatabaseContainerOptions(); diff --git a/src/test/java/com/github/shyiko/mysql/binlog/GtidSetTest.java b/src/test/java/com/github/shyiko/mysql/binlog/GtidSetTest.java index d6e212f9..0d03b3b1 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/GtidSetTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/GtidSetTest.java @@ -20,9 +20,11 @@ import com.github.shyiko.mysql.binlog.event.MySqlGtid; import org.testng.annotations.Test; +import java.util.Collection; import java.util.LinkedList; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; @@ -184,4 +186,148 @@ public void testAddAnotherObjectAsGtidFails() { GtidSet gtidSet = new GtidSet(""); gtidSet.addGtid(MariadbGtidSet.MariaGtid.parse("1-2-3")); } + + @Test + public void testParseTaggedGtidSet() { + // Test parsing GTID set with tagged GTIDs (MySQL 8.3+) + final GtidSet gtidSet = new GtidSet("24bc7850-2c16-11e6-a073-0242ac110002:mytag:1-5"); + + final Collection uuidSets = gtidSet.getUUIDSets(); + assertEquals(uuidSets.size(), 1); + + final GtidSet.UUIDSet uuidSet = uuidSets.iterator().next(); + assertEquals(uuidSet.getServerId().toString(), "24bc7850-2c16-11e6-a073-0242ac110002"); + assertEquals(uuidSet.getTag(), "mytag"); + assertEquals(uuidSet.getIntervals().size(), 1); + assertEquals(uuidSet.getIntervals().get(0).getStart(), 1L); + assertEquals(uuidSet.getIntervals().get(0).getEnd(), 5L); + assertEquals(gtidSet.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:mytag:1-5"); + } + + @Test + public void testParseTaggedGtidSetWithMultipleIntervals() { + // Test parsing tagged GTID set with multiple intervals + final GtidSet gtidSet = new GtidSet("24bc7850-2c16-11e6-a073-0242ac110002:prod:1-5:10-15:20"); + + final Collection uuidSets = gtidSet.getUUIDSets(); + assertEquals(uuidSets.size(), 1); + + final GtidSet.UUIDSet uuidSet = uuidSets.iterator().next(); + assertEquals(uuidSet.getServerId().toString(), "24bc7850-2c16-11e6-a073-0242ac110002"); + assertEquals(uuidSet.getTag(), "prod"); + assertEquals(uuidSet.getIntervals().size(), 3); + assertEquals(uuidSet.getIntervals().get(0).getStart(), 1L); + assertEquals(uuidSet.getIntervals().get(0).getEnd(), 5L); + assertEquals(uuidSet.getIntervals().get(1).getStart(), 10L); + assertEquals(uuidSet.getIntervals().get(1).getEnd(), 15L); + assertEquals(uuidSet.getIntervals().get(2).getStart(), 20L); + assertEquals(uuidSet.getIntervals().get(2).getEnd(), 20L); + } + + @Test + public void testParseMixedGtidSet() { + // Test parsing GTID set with both tagged and non-tagged GTIDs + final GtidSet gtidSet = new GtidSet( + "24bc7850-2c16-11e6-a073-0242ac110002:1-5," + + "aae57b2f-8e44-11ee-a3d6-a036bcda1a41:mytag:1-10" + ); + + final Collection uuidSets = gtidSet.getUUIDSets(); + assertEquals(uuidSets.size(), 2); + + // Verify both UUIDs are present + assertNotNull(gtidSet.getUUIDSet("24bc7850-2c16-11e6-a073-0242ac110002")); + assertNotNull(gtidSet.getUUIDSet("aae57b2f-8e44-11ee-a3d6-a036bcda1a41", "mytag")); + } + + @Test + public void testParseExecutedGtidSetWithTaggedIntervals() { + final GtidSet gtidSet = new GtidSet( + "24bc7850-2c16-11e6-a073-0242ac110002:1-20:testtag:1-3" + ); + + assertEquals(gtidSet.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:1-20:testtag:1-3"); + } + + @Test + public void testParseExecutedGtidSetWithSeparatedTaggedIntervals() { + final GtidSet gtidSet = new GtidSet( + "24bc7850-2c16-11e6-a073-0242ac110002:1-5:testtag:7-9:othertag:6" + ); + + assertEquals(gtidSet.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:1-5:testtag:7-9:othertag:6-6"); + } + + @Test + public void testParseMixedGtidSetMultipleServers() { + // Test parsing complex GTID set with multiple servers, some tagged + final GtidSet gtidSet = new GtidSet( + "24bc7850-2c16-11e6-a073-0242ac110002:1-5:10," + + "aae57b2f-8e44-11ee-a3d6-a036bcda1a41:tag1:1-10:15-20," + + "994ab859-8ea8-11ee-a568-a036bcda1a41:1-3," + + "bd9794e0-1d65-11ed-a7e7-0adb305b3a12:tag2:5-9" + ); + + final Collection uuidSets = gtidSet.getUUIDSets(); + assertEquals(uuidSets.size(), 4); + } + + @Test + public void testAddTaggedMySqlGtid() { + // Test adding tagged GTID to set + final GtidSet gtidSet = new GtidSet(""); + gtidSet.addGtid(MySqlGtid.fromString("00000000-0000-0000-0000-000000000000:mytag:2")); + + assertEquals(gtidSet.toString(), "00000000-0000-0000-0000-000000000000:mytag:2-2"); + } + + @Test + public void testAddTaggedMySqlGtidToExistingSet() { + // Test adding tagged GTID to existing set + final GtidSet gtidSet = new GtidSet("00000000-0000-0000-0000-000000000000:1"); + gtidSet.addGtid(MySqlGtid.fromString("00000000-0000-0000-0000-000000000000:mytag:2")); + + assertEquals(gtidSet.toString(), "00000000-0000-0000-0000-000000000000:1-1:mytag:2-2"); + } + + @Test + public void testParseTaggedGtidSetWithComplexTag() { + // Test parsing GTID set with complex tag name + final GtidSet gtidSet = new GtidSet("24bc7850-2c16-11e6-a073-0242ac110002:prod_db_01:1-100"); + + final Collection uuidSets = gtidSet.getUUIDSets(); + assertEquals(uuidSets.size(), 1); + + final GtidSet.UUIDSet uuidSet = uuidSets.iterator().next(); + assertEquals(uuidSet.getServerId().toString(), "24bc7850-2c16-11e6-a073-0242ac110002"); + assertEquals(uuidSet.getTag(), "prod_db_01"); + assertEquals(uuidSet.getIntervals().size(), 1); + assertEquals(uuidSet.getIntervals().get(0).getStart(), 1L); + assertEquals(uuidSet.getIntervals().get(0).getEnd(), 100L); + } + + @Test + public void testParseTaggedAndUntaggedGtidSetsAreDistinct() { + final GtidSet legacySet = new GtidSet("24bc7850-2c16-11e6-a073-0242ac110002:1-5"); + final GtidSet taggedSet = new GtidSet("24bc7850-2c16-11e6-a073-0242ac110002:tag:1-5"); + + // Tagged and untagged GTIDs from the same UUID are different TSIDs. + final GtidSet.UUIDSet legacyUuidSet = legacySet.getUUIDSet("24bc7850-2c16-11e6-a073-0242ac110002"); + + assertNotNull(legacyUuidSet); + assertEquals(legacyUuidSet.getTag(), null); + assertEquals(taggedSet.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:tag:1-5"); + assertNotEquals(legacySet, taggedSet); + } + + @Test + public void testParsePreviousTagFirstGtidSetFormat() { + final GtidSet gtidSet = new GtidSet("tag:24bc7850-2c16-11e6-a073-0242ac110002:1-5"); + + final GtidSet.UUIDSet uuidSet = gtidSet.getUUIDSet("24bc7850-2c16-11e6-a073-0242ac110002", "tag"); + assertNotNull(uuidSet); + assertEquals(uuidSet.getIntervals().get(0), new Interval(1, 5)); + assertEquals(gtidSet.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:tag:1-5"); + } + } diff --git a/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java b/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java index 8d84c2bd..56676035 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/MysqlOnetimeServer.java @@ -145,7 +145,13 @@ public void boot() throws Exception { public void setupSlave(int masterPort) throws SQLException { Connection master = DriverManager.getConnection("jdbc:mysql://127.0.0.1:" + masterPort + "/mysql?useSSL=false", "root", ""); - ResultSet rs = master.createStatement().executeQuery("show master status"); + + // MySQL 8.4+ renamed SHOW MASTER STATUS to SHOW BINARY LOG STATUS and replication commands + String version = System.getProperty("mysql.image", "mysql:8.0"); + boolean isMySQL84Plus = version.contains("8.4") || version.contains("8.5") || version.contains("9."); + String statusQuery = isMySQL84Plus ? "SHOW BINARY LOG STATUS" : "show master status"; + + ResultSet rs = master.createStatement().executeQuery(statusQuery); if ( !rs.next() ) throw new RuntimeException("could not get master status"); @@ -153,25 +159,39 @@ public void setupSlave(int masterPort) throws SQLException { Long position = rs.getLong("Position"); rs.close(); - String changeSQL = String.format( - "CHANGE MASTER to master_host = '127.0.0.1', master_user='maxwell', master_password='maxwell', " - + "master_log_file = '%s', master_log_pos = %d, master_port = %d", - file, position, masterPort - ); + String changeSQL; + if (isMySQL84Plus) { + changeSQL = String.format( + "CHANGE REPLICATION SOURCE to SOURCE_HOST = '127.0.0.1', SOURCE_USER='maxwell', SOURCE_PASSWORD='maxwell', " + + "SOURCE_LOG_FILE = '%s', SOURCE_LOG_POS = %d, SOURCE_PORT = %d", + file, position, masterPort + ); + } else { + changeSQL = String.format( + "CHANGE MASTER to master_host = '127.0.0.1', master_user='maxwell', master_password='maxwell', " + + "master_log_file = '%s', master_log_pos = %d, master_port = %d", + file, position, masterPort + ); + } logger.info("starting up slave: " + changeSQL); getConnection().createStatement().execute(changeSQL); - getConnection().createStatement().execute("START SLAVE"); + String startCommand = isMySQL84Plus ? "START REPLICA" : "START SLAVE"; + getConnection().createStatement().execute(startCommand); rs.close(); - ResultSet status = query("show slave status"); + String showStatusCommand = isMySQL84Plus ? "SHOW REPLICA STATUS" : "show slave status"; + ResultSet status = query(showStatusCommand); if ( !status.next() ) throw new RuntimeException("could not get slave status"); - if ( status.getString("Slave_IO_Running").equals("No") - || status.getString("Slave_SQL_Running").equals("No")) { - throw new RuntimeException("could not start slave: " + dumpQuery("show slave status")); + String ioRunningColumn = isMySQL84Plus ? "Replica_IO_Running" : "Slave_IO_Running"; + String sqlRunningColumn = isMySQL84Plus ? "Replica_SQL_Running" : "Slave_SQL_Running"; + + if ( status.getString(ioRunningColumn).equals("No") + || status.getString(sqlRunningColumn).equals("No")) { + throw new RuntimeException("could not start slave: " + dumpQuery(showStatusCommand)); } status.close(); @@ -271,17 +291,25 @@ private static String getVersionString() { } public void waitForSlaveToBeCurrent(MysqlOnetimeServer master) throws Exception { - ResultSet ms = master.query("show master status"); + // MySQL 8.4+ renamed SHOW MASTER STATUS to SHOW BINARY LOG STATUS + String version = System.getProperty("mysql.image", "mysql:8.0"); + boolean isMySQL84Plus = version.contains("8.4") || version.contains("8.5") || version.contains("9."); + String masterStatusQuery = isMySQL84Plus ? "SHOW BINARY LOG STATUS" : "show master status"; + String slaveStatusQuery = isMySQL84Plus ? "SHOW REPLICA STATUS" : "show slave status"; + String relayMasterLogFileColumn = isMySQL84Plus ? "Relay_Source_Log_File" : "Relay_Master_Log_File"; + String execMasterLogPosColumn = isMySQL84Plus ? "Exec_Source_Log_Pos" : "Exec_Master_Log_Pos"; + + ResultSet ms = master.query(masterStatusQuery); ms.next(); String masterFile = ms.getString("File"); Long masterPos = ms.getLong("Position"); ms.close(); while ( true ) { - ResultSet rs = query("show slave status"); + ResultSet rs = query(slaveStatusQuery); rs.next(); - if ( rs.getString("Relay_Master_Log_File").equals(masterFile) && - rs.getLong("Exec_Master_Log_Pos") >= masterPos ) + if ( rs.getString(relayMasterLogFileColumn).equals(masterFile) && + rs.getLong(execMasterLogPosColumn) >= masterPos ) return; Thread.sleep(200); diff --git a/src/test/java/com/github/shyiko/mysql/binlog/TestDatabaseContainer.java b/src/test/java/com/github/shyiko/mysql/binlog/TestDatabaseContainer.java index 15644d0a..4cf7602a 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/TestDatabaseContainer.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/TestDatabaseContainer.java @@ -45,8 +45,14 @@ public TestDatabaseContainer() { * Creates a new test database container with specific options. */ public TestDatabaseContainer(TestDatabaseContainerOptions options) { - // Use version from options first, then fall back to system property - String version = options.version != null ? options.version : getImageFromSystemProperty(); + // Use version from options first, then fall back to system property, then default + String version = options.version; + if (version == null || version.isEmpty()) { + version = getImageFromSystemProperty(); + } + if (version == null || version.isEmpty()) { + version = "mysql:8.0"; // Default version + } logger.info("Using database image: " + version); @@ -54,18 +60,66 @@ public TestDatabaseContainer(TestDatabaseContainerOptions options) { if (version.toLowerCase().contains("mariadb")) { this.databaseType = DatabaseType.MARIADB; DockerImageName imageName = DockerImageName.parse(version).asCompatibleSubstituteFor("mariadb"); + + List commands = new java.util.ArrayList<>(); + commands.add("--log-bin=master"); + commands.add("--binlog_format=row"); + commands.add("--server-id=" + options.serverID); + commands.add("--default-time-zone=+00:00"); + commands.add("--max_allowed_packet=16M"); + + if (options.extraParams != null && !options.extraParams.isEmpty()) { + String[] extraParamArray = options.extraParams.trim().split("\\s+"); + commands.addAll(Arrays.asList(extraParamArray)); + } + this.container = new MariaDBContainer<>(imageName) .withDatabaseName("mysql") .withUsername("root") - .withPassword(""); + .withPassword("") + .withCommand(commands.toArray(new String[0])); } else { this.databaseType = DatabaseType.MYSQL; DockerImageName imageName = DockerImageName.parse(version).asCompatibleSubstituteFor("mysql"); + + // Build command parameters + List commands = new java.util.ArrayList<>(); + + // Configure container based on options + if (options.gtid) { + logger.info("Enabling GTID mode"); + commands.add("--gtid-mode=ON"); + commands.add("--log-slave-updates=ON"); + commands.add("--enforce-gtid-consistency=true"); + } + + commands.add("--log-bin=master"); + commands.add("--binlog_format=row"); + commands.add("--server-id=" + options.serverID); + commands.add("--default-time-zone=+00:00"); + + // Add full row metadata for MySQL 8.0+ + if (options.fullRowMetaData && !version.toLowerCase().startsWith("mariadb")) { + MysqlVersion mysqlVersion = parseVersion(version); + if (mysqlVersion.atLeast(8, 0)) { + commands.add("--binlog-row-metadata=FULL"); + } + } + + // Allow larger data packets for testing + commands.add("--max_allowed_packet=16M"); + + // Add extra parameters if provided + if (options.extraParams != null && !options.extraParams.isEmpty()) { + String[] extraParamArray = options.extraParams.trim().split("\\s+"); + commands.addAll(Arrays.asList(extraParamArray)); + } + this.container = new MySQLContainer<>(imageName) .withDatabaseName("mysql") .withUsername("root") .withPassword("") - .withCommand("--default-time-zone=+00:00"); + .withCommand(commands.toArray(new String[0])); } // Configure network if provided (for master-slave setups) @@ -75,47 +129,14 @@ public TestDatabaseContainer(TestDatabaseContainerOptions options) { container.withNetworkAliases(options.networkAlias); } } - - // Build all command parameters together - List commands = new java.util.ArrayList<>(); - - // Configure container based on options - if (options.gtid) { - logger.info("Enabling GTID mode"); - commands.add("--gtid-mode=ON"); - commands.add("--log-slave-updates=ON"); - commands.add("--enforce-gtid-consistency=true"); - } - - commands.add("--log-bin=master"); - commands.add("--binlog_format=row"); - commands.add("--server-id=" + options.serverID); - commands.add("--default-time-zone=+00:00"); - - // Add full row metadata for MySQL 8.0+ - if (options.fullRowMetaData && !version.toLowerCase().startsWith("mariadb")) { - MysqlVersion mysqlVersion = parseVersion(version); - if (mysqlVersion.atLeast(8, 0)) { - commands.add("--binlog-row-metadata=FULL"); - } - } - - // Allow larger data packets for testing - commands.add("--max_allowed_packet=16M"); - - // Add extra parameters if provided - if (options.extraParams != null && !options.extraParams.isEmpty()) { - String[] extraParamArray = options.extraParams.trim().split("\\s+"); - commands.addAll(Arrays.asList(extraParamArray)); - } - - // Apply all commands at once - container.withCommand(commands.toArray(new String[0])); } private static String getImageFromSystemProperty() { String mysqlImage = System.getProperty("mysql.image"); - return mysqlImage == null ? "mysql:8.0" : mysqlImage; + String result = mysqlImage == null ? "mysql:8.0" : mysqlImage; + Logger.getLogger(TestDatabaseContainer.class.getName()).info( + "mysql.image system property: " + mysqlImage + " -> using: " + result); + return result; } private static MysqlVersion parseVersion(String imageTag) { @@ -156,8 +177,16 @@ public void start() throws Exception { } if (databaseType == DatabaseType.MYSQL) { - this.connection.createStatement().executeUpdate( - "CREATE USER 'maxwell'@'%' IDENTIFIED WITH mysql_native_password BY 'maxwell'"); + // MySQL 8.4+ removed mysql_native_password plugin, use default authentication + String version = getImageFromSystemProperty(); + MysqlVersion mysqlVersion = parseVersion(version); + if (mysqlVersion.atLeast(8, 4)) { + this.connection.createStatement().executeUpdate( + "CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell'"); + } else { + this.connection.createStatement().executeUpdate( + "CREATE USER 'maxwell'@'%' IDENTIFIED WITH mysql_native_password BY 'maxwell'"); + } } else { this.connection.createStatement().executeUpdate( "CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell'"); @@ -307,7 +336,13 @@ public String dumpQuery(String query) throws SQLException { */ public void setupSlave(TestDatabaseContainer master) throws SQLException { Connection masterConn = master.getConnection(); - ResultSet rs = masterConn.createStatement().executeQuery("SHOW MASTER STATUS"); + + // MySQL 8.4+ renamed SHOW MASTER STATUS to SHOW BINARY LOG STATUS + String version = getImageFromSystemProperty(); + MysqlVersion mysqlVersion = parseVersion(version); + String statusQuery = mysqlVersion.atLeast(8, 4) ? "SHOW BINARY LOG STATUS" : "SHOW MASTER STATUS"; + + ResultSet rs = masterConn.createStatement().executeQuery(statusQuery); if (!rs.next()) { throw new RuntimeException("Could not get master status"); } @@ -323,25 +358,75 @@ public void setupSlave(TestDatabaseContainer master) throws SQLException { logger.info("Master container IP: " + masterHost); - String changeSQL = String.format( - "CHANGE MASTER TO master_host = '%s', master_user='maxwell', master_password='maxwell', " + - "master_log_file = '%s', master_log_pos = %d, master_port = %d", - masterHost, file, position, 3306 - ); + // MySQL 8.4+ renamed replication commands + String versionStr = getImageFromSystemProperty(); + MysqlVersion mysqlVer = parseVersion(versionStr); + boolean isMySQL84Plus = mysqlVer.atLeast(8, 4); + + String changeSQL; + String startCommand; + String showStatusCommand; + String ioRunningColumn; + String sqlRunningColumn; + + if (isMySQL84Plus) { + changeSQL = String.format( + "CHANGE REPLICATION SOURCE TO SOURCE_HOST = '%s', SOURCE_USER='maxwell', SOURCE_PASSWORD='maxwell', " + + "SOURCE_LOG_FILE = '%s', SOURCE_LOG_POS = %d, SOURCE_PORT = %d, GET_SOURCE_PUBLIC_KEY = 1", + masterHost, file, position, 3306 + ); + startCommand = "START REPLICA"; + showStatusCommand = "SHOW REPLICA STATUS"; + ioRunningColumn = "Replica_IO_Running"; + sqlRunningColumn = "Replica_SQL_Running"; + } else { + if (databaseType == DatabaseType.MARIADB) { + changeSQL = String.format( + "CHANGE MASTER TO master_host = '%s', master_user='maxwell', master_password='maxwell', " + + "master_log_file = '%s', master_log_pos = %d, master_port = %d", + masterHost, file, position, 3306 + ); + } else { + changeSQL = String.format( + "CHANGE MASTER TO master_host = '%s', master_user='maxwell', master_password='maxwell', " + + "master_log_file = '%s', master_log_pos = %d, master_port = %d, GET_MASTER_PUBLIC_KEY = 1", + masterHost, file, position, 3306 + ); + } + startCommand = "START SLAVE"; + showStatusCommand = "SHOW SLAVE STATUS"; + ioRunningColumn = "Slave_IO_Running"; + sqlRunningColumn = "Slave_SQL_Running"; + } logger.info("Starting up slave: " + changeSQL); getConnection().createStatement().execute(changeSQL); - getConnection().createStatement().execute("START SLAVE"); + getConnection().createStatement().execute(startCommand); - ResultSet status = query("SHOW SLAVE STATUS"); + // Wait a moment for replication to start before checking status + // The IO thread needs time to establish connection to the master + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for replication to start", e); + } + + ResultSet status = query(showStatusCommand); if (!status.next()) { throw new RuntimeException("Could not get slave status"); } - if (status.getString("Slave_IO_Running").equals("No") || - status.getString("Slave_SQL_Running").equals("No")) { - throw new RuntimeException("Could not start slave: " + dumpQuery("SHOW SLAVE STATUS")); + String ioRunning = status.getString(ioRunningColumn); + String sqlRunning = status.getString(sqlRunningColumn); + + // IO thread may still be "Connecting" initially, which is acceptable + // Only fail if it's explicitly "No" (error state) + if (ioRunning.equals("No") || sqlRunning.equals("No")) { + throw new RuntimeException("Could not start slave: " + dumpQuery(showStatusCommand)); } + + logger.info("Slave replication started: IO=" + ioRunning + ", SQL=" + sqlRunning); status.close(); } @@ -349,7 +434,12 @@ public void setupSlave(TestDatabaseContainer master) throws SQLException { * Waits for the slave to catch up with the master using Awaitility. */ public void waitForSlaveToBeCurrent(TestDatabaseContainer master) throws Exception { - ResultSet ms = master.query("SHOW MASTER STATUS"); + // MySQL 8.4+ renamed SHOW MASTER STATUS to SHOW BINARY LOG STATUS + String version = getImageFromSystemProperty(); + MysqlVersion mysqlVersion = parseVersion(version); + String statusQuery = mysqlVersion.atLeast(8, 4) ? "SHOW BINARY LOG STATUS" : "SHOW MASTER STATUS"; + + ResultSet ms = master.query(statusQuery); ms.next(); final String masterFile = ms.getString("File"); final Long masterPos = ms.getLong("Position"); @@ -357,28 +447,36 @@ public void waitForSlaveToBeCurrent(TestDatabaseContainer master) throws Excepti logger.info("Waiting for slave to catch up: master file=" + masterFile + ", position=" + masterPos); + // MySQL 8.4+ renamed SHOW SLAVE STATUS to SHOW REPLICA STATUS + final boolean isMySQL84Plus = mysqlVersion.atLeast(8, 4); + final String showStatusCommand = isMySQL84Plus ? "SHOW REPLICA STATUS" : "SHOW SLAVE STATUS"; + final String ioRunningColumn = isMySQL84Plus ? "Replica_IO_Running" : "Slave_IO_Running"; + final String sqlRunningColumn = isMySQL84Plus ? "Replica_SQL_Running" : "Slave_SQL_Running"; + final String relayMasterLogFileColumn = isMySQL84Plus ? "Relay_Source_Log_File" : "Relay_Master_Log_File"; + final String execMasterLogPosColumn = isMySQL84Plus ? "Exec_Source_Log_Pos" : "Exec_Master_Log_Pos"; + Awaitility.await() .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofMillis(200)) .pollDelay(Duration.ZERO) .ignoreExceptions() .until(() -> { - try (ResultSet rs = query("SHOW SLAVE STATUS")) { + try (ResultSet rs = query(showStatusCommand)) { if (!rs.next()) { logger.warning("Slave status not available"); return false; } - String slaveIORunning = rs.getString("Slave_IO_Running"); - String slaveSQLRunning = rs.getString("Slave_SQL_Running"); - String relayMasterLogFile = rs.getString("Relay_Master_Log_File"); - Long execMasterLogPos = rs.getLong("Exec_Master_Log_Pos"); + String slaveIORunning = rs.getString(ioRunningColumn); + String slaveSQLRunning = rs.getString(sqlRunningColumn); + String relayMasterLogFile = rs.getString(relayMasterLogFileColumn); + Long execMasterLogPos = rs.getLong(execMasterLogPosColumn); logger.info("Slave status: IO=" + slaveIORunning + ", SQL=" + slaveSQLRunning + ", file=" + relayMasterLogFile + ", pos=" + execMasterLogPos); if (!"Yes".equals(slaveIORunning) || !"Yes".equals(slaveSQLRunning)) { - throw new RuntimeException("Slave replication not running: " + dumpQuery("SHOW SLAVE STATUS")); + throw new RuntimeException("Slave replication not running: " + dumpQuery(showStatusCommand)); } boolean caughtUp = relayMasterLogFile.equals(masterFile) && execMasterLogPos >= masterPos; diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/EventTypeTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/EventTypeTest.java index df1e1718..58ea74d4 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/event/EventTypeTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/EventTypeTest.java @@ -74,7 +74,7 @@ public void testIsDelete() throws Exception { public void testByEventNumber() { assertEquals(EventType.byEventNumber(0), EventType.UNKNOWN); assertNull(EventType.byEventNumber(-1)); - assertNull(EventType.byEventNumber(41)); + assertNull(EventType.byEventNumber(43)); assertNull(EventType.byEventNumber(159)); assertNull(EventType.byEventNumber(164)); assertEquals(EventType.byEventNumber(1), EventType.START_V3); diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/MySqlGtidTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/MySqlGtidTest.java new file mode 100644 index 00000000..ffce2f77 --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/MySqlGtidTest.java @@ -0,0 +1,190 @@ +package com.github.shyiko.mysql.binlog.event; + +import org.testng.annotations.Test; + +import java.util.UUID; + +import static org.testng.Assert.*; + +/** + * Unit tests for MySqlGtid class, including MySQL 8.3+ tagged GTID support. + */ +public class MySqlGtidTest { + + @Test + public void testParseLegacyFormat() { + // Test backward compatibility with non-tagged GTIDs + final MySqlGtid gtid = MySqlGtid.fromString("24bc7850-2c16-11e6-a073-0242ac110002:11"); + + assertNull(gtid.getTag()); + assertEquals(gtid.getServerId().toString(), "24bc7850-2c16-11e6-a073-0242ac110002"); + assertEquals(gtid.getTransactionId(), 11L); + assertEquals(gtid.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:11"); + } + + @Test + public void testParseTaggedFormat() { + // Test MySQL 8.3+ tagged GTID format + final MySqlGtid gtid = MySqlGtid.fromString("24bc7850-2c16-11e6-a073-0242ac110002:mytag:11"); + + assertEquals(gtid.getTag(), "mytag"); + assertEquals(gtid.getServerId().toString(), "24bc7850-2c16-11e6-a073-0242ac110002"); + assertEquals(gtid.getTransactionId(), 11L); + assertEquals(gtid.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:mytag:11"); + } + + @Test + public void testParseTaggedFormatWithComplexTag() { + // Test with more complex tag names + final MySqlGtid gtid = MySqlGtid.fromString("aae57b2f-8e44-11ee-a3d6-a036bcda1a41:prod_db_01:42"); + + assertEquals(gtid.getTag(), "prod_db_01"); + assertEquals(gtid.getServerId().toString(), "aae57b2f-8e44-11ee-a3d6-a036bcda1a41"); + assertEquals(gtid.getTransactionId(), 42L); + assertEquals(gtid.toString(), "aae57b2f-8e44-11ee-a3d6-a036bcda1a41:prod_db_01:42"); + } + + @Test + public void testParseTagFirstFormatForBackwardCompatibility() { + // Accept the previous tag-first representation for backward compatibility. + final MySqlGtid gtid = MySqlGtid.fromString("12345:994ab859-8ea8-11ee-a568-a036bcda1a41:3"); + + assertEquals(gtid.getTag(), "12345"); + assertEquals(gtid.getServerId().toString(), "994ab859-8ea8-11ee-a568-a036bcda1a41"); + assertEquals(gtid.getTransactionId(), 3L); + assertEquals(gtid.toString(), "994ab859-8ea8-11ee-a568-a036bcda1a41:12345:3"); + } + + @Test + public void testParseTaggedFormatWithUnderscoreTag() { + // Test with underscore in tag + final MySqlGtid gtid = MySqlGtid.fromString("bd9794e0-1d65-11ed-a7e7-0adb305b3a12:my_tag_123:9"); + + assertEquals(gtid.getTag(), "my_tag_123"); + assertEquals(gtid.getServerId().toString(), "bd9794e0-1d65-11ed-a7e7-0adb305b3a12"); + assertEquals(gtid.getTransactionId(), 9L); + assertEquals(gtid.toString(), "bd9794e0-1d65-11ed-a7e7-0adb305b3a12:my_tag_123:9"); + } + + @Test + public void testConstructorWithoutTag() { + // Test backward compatible constructor + final UUID uuid = UUID.fromString("24bc7850-2c16-11e6-a073-0242ac110002"); + final MySqlGtid gtid = new MySqlGtid(uuid, 11L); + + assertNull(gtid.getTag()); + assertEquals(gtid.getServerId(), uuid); + assertEquals(gtid.getTransactionId(), 11L); + assertEquals(gtid.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:11"); + } + + @Test + public void testConstructorWithTag() { + // Test new constructor with tag + final UUID uuid = UUID.fromString("24bc7850-2c16-11e6-a073-0242ac110002"); + final MySqlGtid gtid = new MySqlGtid("mytag", uuid, 11L); + + assertEquals(gtid.getTag(), "mytag"); + assertEquals(gtid.getServerId(), uuid); + assertEquals(gtid.getTransactionId(), 11L); + assertEquals(gtid.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:mytag:11"); + } + + @Test + public void testConstructorWithEmptyTag() { + // Test that empty tag is treated as no tag in toString + final UUID uuid = UUID.fromString("24bc7850-2c16-11e6-a073-0242ac110002"); + final MySqlGtid gtid = new MySqlGtid("", uuid, 11L); + + assertEquals(gtid.getTag(), ""); + assertEquals(gtid.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:11"); + } + + @Test + public void testConstructorWithNullTag() { + // Test that null tag works correctly + final UUID uuid = UUID.fromString("24bc7850-2c16-11e6-a073-0242ac110002"); + final MySqlGtid gtid = new MySqlGtid(null, uuid, 11L); + + assertNull(gtid.getTag()); + assertEquals(gtid.toString(), "24bc7850-2c16-11e6-a073-0242ac110002:11"); + } + + @Test + public void testParseLegacyFormatWithLargeTransactionId() { + // Test with large transaction ID + final MySqlGtid gtid = MySqlGtid.fromString("24bc7850-2c16-11e6-a073-0242ac110002:9223372036854775807"); + + assertNull(gtid.getTag()); + assertEquals(gtid.getServerId().toString(), "24bc7850-2c16-11e6-a073-0242ac110002"); + assertEquals(gtid.getTransactionId(), Long.MAX_VALUE); + } + + @Test + public void testParseTaggedFormatWithLargeTransactionId() { + // Test tagged format with large transaction ID + final MySqlGtid gtid = MySqlGtid.fromString("24bc7850-2c16-11e6-a073-0242ac110002:tag:9223372036854775807"); + + assertEquals(gtid.getTag(), "tag"); + assertEquals(gtid.getServerId().toString(), "24bc7850-2c16-11e6-a073-0242ac110002"); + assertEquals(gtid.getTransactionId(), Long.MAX_VALUE); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testParseInvalidFormatTooFewParts() { + // Test error handling for invalid format (only UUID, no transaction ID) + MySqlGtid.fromString("24bc7850-2c16-11e6-a073-0242ac110002"); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testParseInvalidFormatTooManyParts() { + // Test error handling for too many parts + MySqlGtid.fromString("tag:uuid:123:extra"); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testParseInvalidUuidInLegacyFormat() { + // Test error handling for invalid UUID in legacy format + MySqlGtid.fromString("invalid-uuid:11"); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testParseInvalidUuidInTaggedFormat() { + // Test error handling for invalid UUID in tagged format + MySqlGtid.fromString("invalid-uuid:mytag:11"); + } + + @Test(expectedExceptions = NumberFormatException.class) + public void testParseInvalidTransactionIdInLegacyFormat() { + // Test error handling for invalid transaction ID in legacy format + MySqlGtid.fromString("24bc7850-2c16-11e6-a073-0242ac110002:notanumber"); + } + + @Test(expectedExceptions = NumberFormatException.class) + public void testParseInvalidTransactionIdInTaggedFormat() { + // Test error handling for invalid transaction ID in tagged format + MySqlGtid.fromString("24bc7850-2c16-11e6-a073-0242ac110002:mytag:notanumber"); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testParseEmptyString() { + // Test error handling for empty string + MySqlGtid.fromString(""); + } + + @Test + public void testRoundTripLegacyFormat() { + // Test that parsing and toString are inverse operations for legacy format + final String original = "24bc7850-2c16-11e6-a073-0242ac110002:11"; + final MySqlGtid gtid = MySqlGtid.fromString(original); + assertEquals(gtid.toString(), original); + } + + @Test + public void testRoundTripTaggedFormat() { + // Test that parsing and toString are inverse operations for tagged format + final String original = "24bc7850-2c16-11e6-a073-0242ac110002:mytag:11"; + final MySqlGtid gtid = MySqlGtid.fromString(original); + assertEquals(gtid.toString(), original); + } +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/GtidTaggedEventDataDeserializerTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/GtidTaggedEventDataDeserializerTest.java new file mode 100644 index 00000000..8851399e --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/GtidTaggedEventDataDeserializerTest.java @@ -0,0 +1,329 @@ +package com.github.shyiko.mysql.binlog.event.deserialization; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; + +import com.github.shyiko.mysql.binlog.event.GtidTaggedEventData; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.UUID; +import org.testng.annotations.Test; + +public class GtidTaggedEventDataDeserializerTest { + + private final GtidTaggedEventDataDeserializer deserializer = new GtidTaggedEventDataDeserializer(); + + @Test + public void testDeserializeWithTag() throws IOException { + final GtidTaggedEventData data = deserialize(eventBuilder() + .flags(1) + .uuid("aae57b2f-8e44-11ee-a3d6-a036bcda1a41") + .gno(4) + .tag("test_tag") + .lastCommitted(3) + .sequenceNumber(4) + .immediateCommitTimestamp(1701215692713879L) + .transactionLength(0) + .immediateServerVersion(999999) + .build()); + + assertEquals(data.getFlags(), 0x01); + assertEquals(data.getMySqlGtid().toString(), "aae57b2f-8e44-11ee-a3d6-a036bcda1a41:test_tag:4"); + assertEquals(data.getMySqlGtid().getTag(), "test_tag"); + assertEquals(data.getLastCommitted(), 3); + assertEquals(data.getSequenceNumber(), 4); + assertEquals(data.getImmediateCommitTimestamp(), 1701215692713879L); + assertEquals(data.getOriginalCommitTimestamp(), 1701215692713879L); + assertEquals(data.getTransactionLength(), 0); + assertEquals(data.getImmediateServerVersion(), 999999); + assertEquals(data.getOriginalServerVersion(), 999999); + assertEquals(data.getCommitGroupTicket(), 0); + } + + @Test + public void testDeserializeWithoutTag() throws IOException { + final GtidTaggedEventData data = deserialize(eventBuilder() + .flags(0) + .uuid("994ab859-8ea8-11ee-a568-a036bcda1a41") + .gno(3) + .tag("") + .lastCommitted(2) + .sequenceNumber(3) + .immediateCommitTimestamp(1701257014433088L) + .transactionLength(308) + .immediateServerVersion(999999) + .build()); + + assertEquals(data.getFlags(), 0x00); + assertEquals(data.getMySqlGtid().toString(), "994ab859-8ea8-11ee-a568-a036bcda1a41:3"); + assertNull(data.getMySqlGtid().getTag()); + assertEquals(data.getLastCommitted(), 2); + assertEquals(data.getSequenceNumber(), 3); + assertEquals(data.getImmediateCommitTimestamp(), 1701257014433088L); + assertEquals(data.getOriginalCommitTimestamp(), 1701257014433088L); + assertEquals(data.getTransactionLength(), 308); + assertEquals(data.getImmediateServerVersion(), 999999); + assertEquals(data.getOriginalServerVersion(), 999999); + assertEquals(data.getCommitGroupTicket(), 0); + } + + @Test + public void testDeserializeWithFullFields() throws IOException { + final GtidTaggedEventData data = deserialize(eventBuilder() + .flags(0) + .uuid("bd9794e0-1d65-11ed-a7e7-0adb305b3a12") + .gno(9) + .tag("prod") + .lastCommitted(7) + .sequenceNumber(8) + .immediateCommitTimestamp(1699112309893478L) + .transactionLength(315) + .immediateServerVersion(80100) + .commitGroupTicket(5) + .build()); + + assertEquals(data.getFlags(), 0x00); + assertEquals(data.getMySqlGtid().toString(), "bd9794e0-1d65-11ed-a7e7-0adb305b3a12:prod:9"); + assertEquals(data.getMySqlGtid().getTag(), "prod"); + assertEquals(data.getLastCommitted(), 7); + assertEquals(data.getSequenceNumber(), 8); + assertEquals(data.getImmediateCommitTimestamp(), 1699112309893478L); + assertEquals(data.getOriginalCommitTimestamp(), 1699112309893478L); + assertEquals(data.getTransactionLength(), 315); + assertEquals(data.getImmediateServerVersion(), 80100); + assertEquals(data.getOriginalServerVersion(), 80100); + assertEquals(data.getCommitGroupTicket(), 5); + } + + @Test + public void testDeserializeWithDifferentOriginalTimestamp() throws IOException { + final GtidTaggedEventData data = deserialize(eventBuilder() + .flags(1) + .uuid("aae57b2f-8e44-11ee-a3d6-a036bcda1a41") + .gno(4) + .tag("replica") + .lastCommitted(3) + .sequenceNumber(4) + .immediateCommitTimestamp(1701215692713879L) + .originalCommitTimestamp(1701215692713878L) + .transactionLength(0) + .immediateServerVersion(999999) + .build()); + + assertEquals(data.getFlags(), 0x01); + assertEquals(data.getMySqlGtid().toString(), "aae57b2f-8e44-11ee-a3d6-a036bcda1a41:replica:4"); + assertEquals(data.getMySqlGtid().getTag(), "replica"); + assertEquals(data.getLastCommitted(), 3); + assertEquals(data.getSequenceNumber(), 4); + assertEquals(data.getImmediateCommitTimestamp(), 1701215692713879L); + assertEquals(data.getOriginalCommitTimestamp(), 1701215692713878L); + } + + @Test + public void testDeserializeWithDifferentOriginalServerVersion() throws IOException { + final GtidTaggedEventData data = deserialize(eventBuilder() + .flags(0) + .uuid("bd9794e0-1d65-11ed-a7e7-0adb305b3a12") + .gno(9) + .tag("staging") + .lastCommitted(7) + .sequenceNumber(8) + .immediateCommitTimestamp(1699112309893478L) + .transactionLength(315) + .immediateServerVersion(80100) + .originalServerVersion(80099) + .build()); + + assertEquals(data.getFlags(), 0x00); + assertEquals(data.getMySqlGtid().toString(), "bd9794e0-1d65-11ed-a7e7-0adb305b3a12:staging:9"); + assertEquals(data.getMySqlGtid().getTag(), "staging"); + assertEquals(data.getLastCommitted(), 7); + assertEquals(data.getSequenceNumber(), 8); + assertEquals(data.getImmediateCommitTimestamp(), 1699112309893478L); + assertEquals(data.getOriginalCommitTimestamp(), 1699112309893478L); + assertEquals(data.getTransactionLength(), 315); + assertEquals(data.getImmediateServerVersion(), 80100); + assertEquals(data.getOriginalServerVersion(), 80099); + } + + private GtidTaggedEventData deserialize(byte[] bytes) throws IOException { + return deserializer.deserialize(new ByteArrayInputStream(bytes)); + } + + private static EventBuilder eventBuilder() { + return new EventBuilder(); + } + + private static final class EventBuilder { + private int flags; + private UUID uuid; + private long gno; + private String tag = ""; + private long lastCommitted; + private long sequenceNumber; + private long immediateCommitTimestamp; + private Long originalCommitTimestamp; + private long transactionLength; + private int immediateServerVersion; + private Integer originalServerVersion; + private Long commitGroupTicket; + + private EventBuilder flags(int flags) { + this.flags = flags; + return this; + } + + private EventBuilder uuid(String uuid) { + this.uuid = UUID.fromString(uuid); + return this; + } + + private EventBuilder gno(long gno) { + this.gno = gno; + return this; + } + + private EventBuilder tag(String tag) { + this.tag = tag; + return this; + } + + private EventBuilder lastCommitted(long lastCommitted) { + this.lastCommitted = lastCommitted; + return this; + } + + private EventBuilder sequenceNumber(long sequenceNumber) { + this.sequenceNumber = sequenceNumber; + return this; + } + + private EventBuilder immediateCommitTimestamp(long immediateCommitTimestamp) { + this.immediateCommitTimestamp = immediateCommitTimestamp; + return this; + } + + private EventBuilder originalCommitTimestamp(long originalCommitTimestamp) { + this.originalCommitTimestamp = originalCommitTimestamp; + return this; + } + + private EventBuilder transactionLength(long transactionLength) { + this.transactionLength = transactionLength; + return this; + } + + private EventBuilder immediateServerVersion(int immediateServerVersion) { + this.immediateServerVersion = immediateServerVersion; + return this; + } + + private EventBuilder originalServerVersion(int originalServerVersion) { + this.originalServerVersion = originalServerVersion; + return this; + } + + private EventBuilder commitGroupTicket(long commitGroupTicket) { + this.commitGroupTicket = commitGroupTicket; + return this; + } + + private byte[] build() throws IOException { + final java.io.ByteArrayOutputStream fields = new java.io.ByteArrayOutputStream(); + writeFieldId(fields, 0); + writeUnsignedVarLong(fields, flags); + writeFieldId(fields, 1); + writeUuid(fields, uuid); + writeFieldId(fields, 2); + writeSignedVarLong(fields, gno); + writeFieldId(fields, 3); + writeString(fields, tag); + writeFieldId(fields, 4); + writeSignedVarLong(fields, lastCommitted); + writeFieldId(fields, 5); + writeSignedVarLong(fields, sequenceNumber); + writeFieldId(fields, 6); + writeUnsignedVarLong(fields, immediateCommitTimestamp); + if (originalCommitTimestamp != null) { + writeFieldId(fields, 7); + writeUnsignedVarLong(fields, originalCommitTimestamp); + } + writeFieldId(fields, 8); + writeUnsignedVarLong(fields, transactionLength); + writeFieldId(fields, 9); + writeUnsignedVarLong(fields, immediateServerVersion); + if (originalServerVersion != null) { + writeFieldId(fields, 10); + writeUnsignedVarLong(fields, originalServerVersion); + } + if (commitGroupTicket != null) { + writeFieldId(fields, 11); + writeUnsignedVarLong(fields, commitGroupTicket); + } + + final int lastNonIgnorableFieldId = commitGroupTicket != null ? 12 : originalServerVersion != null ? 11 : 10; + int eventSize = fields.size(); + int previousEventSize; + do { + previousEventSize = eventSize; + eventSize = fields.size() + sizeUnsignedVarLong(1) + sizeUnsignedVarLong(previousEventSize) + + sizeUnsignedVarLong(lastNonIgnorableFieldId); + } while (eventSize != previousEventSize); + + final java.io.ByteArrayOutputStream event = new java.io.ByteArrayOutputStream(); + writeUnsignedVarLong(event, 1); + writeUnsignedVarLong(event, eventSize); + writeUnsignedVarLong(event, lastNonIgnorableFieldId); + fields.writeTo(event); + return event.toByteArray(); + } + + private static void writeFieldId(java.io.ByteArrayOutputStream output, int fieldId) throws IOException { + writeUnsignedVarLong(output, fieldId); + } + + private static void writeUuid(java.io.ByteArrayOutputStream output, UUID uuid) throws IOException { + writeUuidLong(output, uuid.getMostSignificantBits()); + writeUuidLong(output, uuid.getLeastSignificantBits()); + } + + private static void writeString(java.io.ByteArrayOutputStream output, String value) throws IOException { + final byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + writeUnsignedVarLong(output, bytes.length); + output.write(bytes); + } + + private static void writeSignedVarLong(java.io.ByteArrayOutputStream output, long value) throws IOException { + writeUnsignedVarLong(output, value >= 0 ? value << 1 : ((-value - 1) << 1) | 1); + } + + private static void writeUnsignedVarLong(java.io.ByteArrayOutputStream output, long value) throws IOException { + final int byteCount = sizeUnsignedVarLong(value); + output.write((int) (((1L << (byteCount - 1)) - 1) | (value << byteCount))); + if (byteCount == 1) { + return; + } + long remaining = value >>> (8 - byteCount + ((byteCount + 7) >> 4)); + for (int i = 0; i < byteCount - 1; ++i) { + output.write((int) (remaining & 0xff)); + remaining >>>= 8; + } + } + + private static int sizeUnsignedVarLong(long value) { + for (int byteCount = 1; byteCount < 9; ++byteCount) { + if ((value >>> (7 * byteCount)) == 0) { + return byteCount; + } + } + return 9; + } + + private static void writeUuidLong(java.io.ByteArrayOutputStream output, long value) throws IOException { + for (int i = 7; i >= 0; --i) { + writeUnsignedVarLong(output, (value >>> (i << 3)) & 0xff); + } + } + } +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java index 5a2eb616..5d80c9b6 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonBinaryValueIntegrationTest.java @@ -69,7 +69,7 @@ public class JsonBinaryValueIntegrationTest { private BinaryLogClient client; private CountDownEventListener eventListener; - private boolean isMaria = "mariadb".equals(System.getenv("MYSQL_VERSION")); + private boolean isMaria = TestDatabaseContainer.getVersion().isMaria; private TestDatabaseContainer masterContainer; @@ -607,7 +607,7 @@ public void tearDown() throws Exception { master.execute(new BinaryLogClientIntegrationTest.Callback() { @Override public void execute(Statement statement) throws SQLException { - statement.execute("drop database json_test"); + statement.execute("drop database if exists json_test"); } }); master.close();