From 26756854a665570be89e12fdfdf7c5dff35483ae Mon Sep 17 00:00:00 2001 From: speedcom Date: Tue, 5 Dec 2017 08:50:23 +0100 Subject: [PATCH 1/9] reorganize build.sbt definition --- build.sbt | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index a8d0922..fd23635 100644 --- a/build.sbt +++ b/build.sbt @@ -39,7 +39,9 @@ initialize := { assert(current == required, s"Unsupported build JDK: java.specification.version $current != $required") } -// PROJECT DEFINITIONS +// ***************************************************************************** +// PROJECTS +// ***************************************************************************** lazy val root = (project in file(".")) .enablePlugins(BuildInfoPlugin, SbtMultiJvm, JavaServerAppPackaging) .configs(MultiJvm) @@ -53,8 +55,10 @@ lazy val root = (project in file(".")) buildInfoKeys := Seq[BuildInfoKey](name, version, scalaVersion, sbtVersion, git.gitHeadCommit, git.gitCurrentBranch), buildInfoOptions += BuildInfoOption.ToJson ) - .settings(versionWithGit) - .settings(git.useGitDescribe := true) + .settings( + versionWithGit, + git.useGitDescribe := true + ) .settings(configAnnotationSettings) .aggregate(core, httpApi, storageInMem, storageRocksDB) .dependsOn(core, httpApi, storageInMem, storageRocksDB) @@ -132,7 +136,21 @@ lazy val storageRocksDB = (project in file("justin-storage-rocksdb")) ) .dependsOn(storageApi) -// ALIASES + +// ***************************************************************************** +// Settings +// ***************************************************************************** +lazy val configAnnotationSettings: Seq[sbt.Setting[_]] = { + Seq( + scalacOptions += "-Xmacro-settings:conf.output.dir=" + baseDirectory.value.getAbsolutePath + "/src/main/resources", + addCompilerPlugin(Library.macroParadise cross CrossVersion.full), + libraryDependencies += Library.configAnnotation + ) +} + +// ***************************************************************************** +// Aliases +// ***************************************************************************** addCommandAlias("compileAll", ";compile;test:compile;multi-jvm:compile") addCommandAlias("testAll", ";test:test;multi-jvm:test") From de2deeaaf96b7ed6ce84b5166cd9c38b6516a5cf Mon Sep 17 00:00:00 2001 From: speedcom Date: Tue, 5 Dec 2017 09:03:14 +0100 Subject: [PATCH 2/9] define justin-spli-brain-resolver sbt sub-project --- build.sbt | 9 +++++++++ project/Dependencies.scala | 2 ++ 2 files changed, 11 insertions(+) diff --git a/build.sbt b/build.sbt index fd23635..c4ccbf7 100644 --- a/build.sbt +++ b/build.sbt @@ -136,6 +136,15 @@ lazy val storageRocksDB = (project in file("justin-storage-rocksdb")) ) .dependsOn(storageApi) +lazy val splitBrainResolver = (project in file("justin-split-brain-resolver")) + .enablePlugins(SbtMultiJvm) + .configs(MultiJvm) + .disablePlugins(RevolverPlugin) + .settings( + name := "justin-split-brain-resolver", + scalaVersion := Version.scala, + libraryDependencies ++= Dependencies.splitBrainResolver + ) // ***************************************************************************** // Settings diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 5f32ec7..dc2ec93 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -73,5 +73,7 @@ object Dependencies { val storageLogDBExperimental = genericTest val storageRocksDB = Seq(rocksdb, rocksdb % "test", kryo) ++ genericTest + val splitBrainResolver = genericTest + val root = core ++ httpApi ++ storageApi } From 05b5d63ef51fe35e7ac5701da2bd799af3ddbf61 Mon Sep 17 00:00:00 2001 From: speedcom Date: Thu, 7 Dec 2017 18:45:21 +0100 Subject: [PATCH 3/9] add TanUkkii007's akka-cluster-custom-downing library to splir-brain-resolver project --- build.sbt | 3 +++ project/Dependencies.scala | 9 +++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index c4ccbf7..72a7301 100644 --- a/build.sbt +++ b/build.sbt @@ -140,6 +140,9 @@ lazy val splitBrainResolver = (project in file("justin-split-brain-resolver")) .enablePlugins(SbtMultiJvm) .configs(MultiJvm) .disablePlugins(RevolverPlugin) + .settings( + resolvers += Resolver.bintrayRepo("tanukkii007", "maven") + ) .settings( name := "justin-split-brain-resolver", scalaVersion := Version.scala, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index dc2ec93..7cdaece 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,6 +17,7 @@ object Version { val configAnnotation = "0.3.7" val macroParadise = "2.1.1" val rocksDB = "5.5.1" + val akkaClusterDowning = "0.0.9" } object Library { @@ -52,6 +53,9 @@ object Library { // storage val rocksdb = "org.rocksdb" % "rocksdbjni" % Version.rocksDB val kryo = "com.esotericsoftware" % "kryo" % Version.kryo % "provided" + + // split-brain resolver + val akkaClusterDowning = "com.github.TanUkkii007" %% "akka-cluster-custom-downing" % Version.akkaClusterDowning } object Dependencies { @@ -63,17 +67,14 @@ object Dependencies { private val akkaHttpCommon = Seq(akkaHttp, akkaHttpSprayJson, akkaHttpTestkit) private val akkaClusterCommon = Seq(akkaRemote, akkaMultiNodeTestkit % "multi-jvm", akkaCluster, akkaClusterMetrics, akkaClusterTools, kamonSigar, akkaClusterManager) + // projects val core = akkaCommon ++ akkaClusterCommon ++ genericTest ++ Seq(scalacheck % "test", logback, scalaLogging) ++ Seq(akkaHttpSprayJson) val ring = genericTest val vectorClocks = genericTest val httpApi = akkaCommon ++ akkaHttpCommon ++ genericTest - val storageApi = genericTest val storageInMem = genericTest - val storageLogDBExperimental = genericTest val storageRocksDB = Seq(rocksdb, rocksdb % "test", kryo) ++ genericTest - val splitBrainResolver = genericTest - val root = core ++ httpApi ++ storageApi } From 0a84a432a676f8f40c62a68c4eabf2a4dd77dec0 Mon Sep 17 00:00:00 2001 From: speedcom Date: Thu, 7 Dec 2017 19:05:04 +0100 Subject: [PATCH 4/9] add type to public values --- project/Dependencies.scala | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7cdaece..37e4205 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -61,20 +61,19 @@ object Library { object Dependencies { import Library._ - private val genericTest = Seq(scalactic, scalatest % "test") - + private val genericTest = Seq(scalactic, scalatest % "test") private val akkaCommon = Seq(akkaActor, akkaSfl4j, akkaTestkit, akkaKryo, akkaStream) private val akkaHttpCommon = Seq(akkaHttp, akkaHttpSprayJson, akkaHttpTestkit) private val akkaClusterCommon = Seq(akkaRemote, akkaMultiNodeTestkit % "multi-jvm", akkaCluster, akkaClusterMetrics, akkaClusterTools, kamonSigar, akkaClusterManager) // projects - val core = akkaCommon ++ akkaClusterCommon ++ genericTest ++ Seq(scalacheck % "test", logback, scalaLogging) ++ Seq(akkaHttpSprayJson) - val ring = genericTest - val vectorClocks = genericTest - val httpApi = akkaCommon ++ akkaHttpCommon ++ genericTest - val storageApi = genericTest - val storageInMem = genericTest - val storageRocksDB = Seq(rocksdb, rocksdb % "test", kryo) ++ genericTest - val splitBrainResolver = genericTest - val root = core ++ httpApi ++ storageApi + val core: Seq[ModuleID] = akkaCommon ++ akkaClusterCommon ++ genericTest ++ Seq(scalacheck % "test", logback, scalaLogging) ++ Seq(akkaHttpSprayJson) + val ring: Seq[ModuleID] = genericTest + val vectorClocks: Seq[ModuleID] = genericTest + val httpApi: Seq[ModuleID] = akkaCommon ++ akkaHttpCommon ++ genericTest + val storageApi: Seq[ModuleID] = genericTest + val storageInMem: Seq[ModuleID] = genericTest + val storageRocksDB: Seq[ModuleID] = Seq(rocksdb, rocksdb % "test", kryo) ++ genericTest + val splitBrainResolver: Seq[ModuleID] = Seq(akkaClusterDowning) ++ genericTest + val root: Seq[ModuleID] = core ++ httpApi ++ storageApi } From 2d0ac22de89d8bcaabd82d1b2918706e97faa35a Mon Sep 17 00:00:00 2001 From: speedcom Date: Fri, 8 Dec 2017 21:33:49 +0100 Subject: [PATCH 5/9] set MajorityLeaderAutoDowning as a downing provider of akka cluster --- .../src/main/resources/reference.conf | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 justin-split-brain-resolver/src/main/resources/reference.conf diff --git a/justin-split-brain-resolver/src/main/resources/reference.conf b/justin-split-brain-resolver/src/main/resources/reference.conf new file mode 100644 index 0000000..f140ad3 --- /dev/null +++ b/justin-split-brain-resolver/src/main/resources/reference.conf @@ -0,0 +1,11 @@ +akka.cluster.downing-provider-class = "tanukki.akka.cluster.autodown.MajorityLeaderAutoDowning" + +custom-downing { + stable-after = 20s + + majority-leader-auto-downing { + majority-member-role = "" + down-if-in-minority = true + shutdown-actor-system-on-resolution = true + } +} \ No newline at end of file From 1b43f200649d3049f8b1c67acb9b6de198907275 Mon Sep 17 00:00:00 2001 From: speedcom Date: Fri, 8 Dec 2017 21:36:19 +0100 Subject: [PATCH 6/9] add split-brain-resolver project to root --- build.sbt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 72a7301..5879205 100644 --- a/build.sbt +++ b/build.sbt @@ -60,8 +60,8 @@ lazy val root = (project in file(".")) git.useGitDescribe := true ) .settings(configAnnotationSettings) - .aggregate(core, httpApi, storageInMem, storageRocksDB) - .dependsOn(core, httpApi, storageInMem, storageRocksDB) + .aggregate(core, httpApi, storageInMem, storageRocksDB, splitBrainResolver) + .dependsOn(core, httpApi, storageInMem, storageRocksDB, splitBrainResolver) lazy val core = (project in file("justin-core")) .disablePlugins(RevolverPlugin) From 450225a84cf0581c80a2885947703f87823cf975 Mon Sep 17 00:00:00 2001 From: speedcom Date: Sun, 10 Dec 2017 19:58:17 +0100 Subject: [PATCH 7/9] add helper method to multi-jvm spec test: awaitMembersUp and address --- .../justin/db/MultiNodeClusterSpec.scala | 47 ++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/src/multi-jvm/scala/justin/db/MultiNodeClusterSpec.scala b/src/multi-jvm/scala/justin/db/MultiNodeClusterSpec.scala index fa8b89d..6f20e89 100644 --- a/src/multi-jvm/scala/justin/db/MultiNodeClusterSpec.scala +++ b/src/multi-jvm/scala/justin/db/MultiNodeClusterSpec.scala @@ -1,10 +1,16 @@ package justin.db +import java.util.concurrent.ConcurrentHashMap + +import akka.actor.Address import akka.cluster.Cluster +import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeSpec import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.Suite +import scala.concurrent.duration.{FiniteDuration, _} + object MultiNodeClusterSpec { val commonBaseConfig: Config = ConfigFactory.parseString( @@ -20,10 +26,49 @@ object MultiNodeClusterSpec { trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeSpec ⇒ + private val cachedAddresses = new ConcurrentHashMap[RoleName, Address] + + def initialParticipants: Int = roles.size + /** * Get the cluster node to use. */ def cluster: Cluster = Cluster(system) - def initialParticipants: Int = roles.size + /** + * Lookup the Address for the role. + * + * Implicit conversion from RoleName to Address. + * + * It is cached, which has the implication that stopping + * and then restarting a role (jvm) with another address is not + * supported. + */ + implicit def address(role: RoleName): Address = { + cachedAddresses.get(role) match { + case null ⇒ + val address = node(role).address + cachedAddresses.put(role, address) + address + case address ⇒ address + } + } + + /** + * Wait until the expected number of members has status Up has been reached. + * Also asserts that nodes in the 'canNotBePartOfMemberRing' are *not* part of the cluster ring. + */ + def awaitMembersUp(numberOfMembers: Int, canNotBePartOfMemberRing: Set[Address] = Set(), timeout: FiniteDuration = 10.seconds): Unit = { + awaitAssert( + a = { + cluster.state.members.size shouldBe numberOfMembers + + println("cluster state: " + cluster.state) + + cluster.state.members.map(_.address) intersect canNotBePartOfMemberRing shouldBe empty + }, + max = timeout, + interval = 2.seconds + ) + } } From 22579f40029fb4fefc56e89efd73413ff26dfa21 Mon Sep 17 00:00:00 2001 From: speedcom Date: Sun, 10 Dec 2017 19:58:57 +0100 Subject: [PATCH 8/9] test that split-brain resolver autodown exited node from cluster --- .../justin/db/SplitBrainResolverSpec.scala | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 src/multi-jvm/scala/justin/db/SplitBrainResolverSpec.scala diff --git a/src/multi-jvm/scala/justin/db/SplitBrainResolverSpec.scala b/src/multi-jvm/scala/justin/db/SplitBrainResolverSpec.scala new file mode 100644 index 0000000..d724c23 --- /dev/null +++ b/src/multi-jvm/scala/justin/db/SplitBrainResolverSpec.scala @@ -0,0 +1,92 @@ +package justin.db + +import akka.remote.testconductor.RoleName +import akka.remote.testkit.{MultiNodeConfig, MultiNodeSpec} +import com.typesafe.config.ConfigFactory +import scala.concurrent.duration._ +import akka.testkit.TestDuration + +final class SplitBrainResolverConfig extends MultiNodeConfig with DockerEtcd { + val first: RoleName = role("first") + val second: RoleName = role("second") + val third: RoleName = role("third") + val fourth: RoleName = role("fourth") + val fifth: RoleName = role("fifth") + + private[this] val allRoles = List(first, second, third, fourth, fifth) + private[this] val clusterName = "SplitBrainResolverSpec" + + private[this] def commonNodeConfig(id: Int) = ConfigFactory.parseString( + s""" + |justin.system = $clusterName + |justin.node-id = $id + |justin.http.port = ${9000 + id} + |akka.cluster.role.storagenode.min-nr-of-members = ${allRoles.size} + |akka.remote.netty.tcp.port = 0 + |akka.remote.netty.tcp.hostname = "localhost" + |akka.remote.netty.tcp.bind-hostname = "0.0.0.0" + |akka.remote.netty.tcp.bind-port = 0 + |akka.cluster.http.management.port = ${19999 + id} + """.stripMargin + ) + + testTransport(on = true) + + commonConfig { + MultiNodeClusterSpec + .commonBaseConfig + .withFallback(JustinDBConfig.init.config) + } + + allRoles.zipWithIndex.foreach { case (roleName, id) => + nodeConfig(roleName)(commonNodeConfig(id)) + } +} + +final class SplitBrainResolverSpecMultiJvmNode1 extends SplitBrainResolverSpec +final class SplitBrainResolverSpecMultiJvmNode2 extends SplitBrainResolverSpec +final class SplitBrainResolverSpecMultiJvmNode3 extends SplitBrainResolverSpec +final class SplitBrainResolverSpecMultiJvmNode4 extends SplitBrainResolverSpec +final class SplitBrainResolverSpecMultiJvmNode5 extends SplitBrainResolverSpec + +abstract class SplitBrainResolverSpec(config: SplitBrainResolverConfig) + extends MultiNodeSpec(config) + with MultiNodeClusterSpec { + import config._ + + def this() = this(new SplitBrainResolverConfig()) + + "The majority leader in a 5 node cluster" must { + + "be able to DOWN a 'last' node that is UNREACHABLE" in within(150.seconds) { + val config = new JustinDBConfig(system.settings.config) + val justinDB = JustinDB.init(config) + + enterBarrier("justindb-cluster-up") + + val fifthAddress = address(fifth) + + runOn(first) { + // kill 'fifth' node + testConductor.exit(fifth, 0).await + enterBarrier("down-fifth-node") + + // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- + + awaitMembersUp(numberOfMembers = 4, canNotBePartOfMemberRing = Set(fifthAddress), 30.seconds.dilated) + } + + runOn(fifth) { + enterBarrier("down-fifth-node") + } + + runOn(second, third, fourth) { + enterBarrier("down-fifth-node") + + awaitMembersUp(numberOfMembers = 4, canNotBePartOfMemberRing = Set(fifthAddress), 30.seconds.dilated) + } + + enterBarrier("await-completion-1") + } + } +} From 37170b7c1e1564d3edd4d06199c4251bcb5a6aaf Mon Sep 17 00:00:00 2001 From: speedcom Date: Wed, 7 Feb 2018 23:01:01 +0100 Subject: [PATCH 9/9] rebase --- build.sbt | 15 ++-------- .../justin/db/MultiNodeClusterSpec.scala | 1 + .../justin/db/SplitBrainResolverSpec.scala | 29 ++++++++----------- 3 files changed, 16 insertions(+), 29 deletions(-) diff --git a/build.sbt b/build.sbt index 5879205..58ad586 100644 --- a/build.sbt +++ b/build.sbt @@ -149,24 +149,15 @@ lazy val splitBrainResolver = (project in file("justin-split-brain-resolver")) libraryDependencies ++= Dependencies.splitBrainResolver ) -// ***************************************************************************** -// Settings -// ***************************************************************************** -lazy val configAnnotationSettings: Seq[sbt.Setting[_]] = { - Seq( - scalacOptions += "-Xmacro-settings:conf.output.dir=" + baseDirectory.value.getAbsolutePath + "/src/main/resources", - addCompilerPlugin(Library.macroParadise cross CrossVersion.full), - libraryDependencies += Library.configAnnotation - ) -} - // ***************************************************************************** // Aliases // ***************************************************************************** addCommandAlias("compileAll", ";compile;test:compile;multi-jvm:compile") addCommandAlias("testAll", ";test:test;multi-jvm:test") -// SETTINGS +// ***************************************************************************** +// Settings +// ***************************************************************************** lazy val commonSettings = Def.settings( compileSettings ) diff --git a/src/multi-jvm/scala/justin/db/MultiNodeClusterSpec.scala b/src/multi-jvm/scala/justin/db/MultiNodeClusterSpec.scala index 6f20e89..7332f6f 100644 --- a/src/multi-jvm/scala/justin/db/MultiNodeClusterSpec.scala +++ b/src/multi-jvm/scala/justin/db/MultiNodeClusterSpec.scala @@ -70,5 +70,6 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS max = timeout, interval = 2.seconds ) + () } } diff --git a/src/multi-jvm/scala/justin/db/SplitBrainResolverSpec.scala b/src/multi-jvm/scala/justin/db/SplitBrainResolverSpec.scala index d724c23..67fc592 100644 --- a/src/multi-jvm/scala/justin/db/SplitBrainResolverSpec.scala +++ b/src/multi-jvm/scala/justin/db/SplitBrainResolverSpec.scala @@ -6,7 +6,7 @@ import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import akka.testkit.TestDuration -final class SplitBrainResolverConfig extends MultiNodeConfig with DockerEtcd { +final class SplitBrainResolverConfig extends MultiNodeConfig { val first: RoleName = role("first") val second: RoleName = role("second") val third: RoleName = role("third") @@ -18,25 +18,20 @@ final class SplitBrainResolverConfig extends MultiNodeConfig with DockerEtcd { private[this] def commonNodeConfig(id: Int) = ConfigFactory.parseString( s""" - |justin.system = $clusterName - |justin.node-id = $id - |justin.http.port = ${9000 + id} + |justin.system = $clusterName + |justin.kubernetes-hostname = s"justindb-$id" + |justin.http.port = ${9000 + id} |akka.cluster.role.storagenode.min-nr-of-members = ${allRoles.size} - |akka.remote.netty.tcp.port = 0 - |akka.remote.netty.tcp.hostname = "localhost" - |akka.remote.netty.tcp.bind-hostname = "0.0.0.0" - |akka.remote.netty.tcp.bind-port = 0 - |akka.cluster.http.management.port = ${19999 + id} + |akka.cluster.http.management.port = ${19999 + id} + |akka.cluster.seed-nodes.0 = "akka.trttl.gremlin.tcp://$clusterName@localhost:25551" + |akka.remote.netty.tcp.port = ${25551 + id} + |akka.remote.netty.tcp.hostname = "localhost" + |akka.remote.netty.tcp.applied-adapters = [trttl, gremlin] + |akka.remote.artery.advanced.test-mode = on """.stripMargin ) - testTransport(on = true) - - commonConfig { - MultiNodeClusterSpec - .commonBaseConfig - .withFallback(JustinDBConfig.init.config) - } + commonConfig(MultiNodeClusterSpec.commonBaseConfig.withFallback(JustinDBConfig.init.config)) allRoles.zipWithIndex.foreach { case (roleName, id) => nodeConfig(roleName)(commonNodeConfig(id)) @@ -60,7 +55,7 @@ abstract class SplitBrainResolverSpec(config: SplitBrainResolverConfig) "be able to DOWN a 'last' node that is UNREACHABLE" in within(150.seconds) { val config = new JustinDBConfig(system.settings.config) - val justinDB = JustinDB.init(config) + val justinDB = JustinDB.init(config)(system) enterBarrier("justindb-cluster-up")