Skip to content

Commit

Permalink
Feature/consistent cassandra schema migrations (#467)
Browse files Browse the repository at this point in the history
Improve robustness of casssandra schema migrations in 3 ways

1. wait for system schema change propagation across the cluster before
  running the next statement adding/altering tables
  to avoid errors like the trace shown below
2. increase timeouts to cassandra to reduce likelihood of migration
statement or meta inserts to fail
3. resolve DNS in case host is configured as a DNS name, not IP

Example error for 1.:

WARN  [MessagingService-Incoming-/10.233.99.138] 2018-08-30 15:28:28,310 IncomingTcpConnection.java:103 - UnknownColumnFamilyException reading from socket; closing
org.apache.cassandra.db.UnknownColumnFamilyException: Couldn't find table for cfId 57f68f40-ac69-11e8-802e-23be9e1e0824. If a table was just created, this is likely due to the schema not being fully propagated.  Please wait for schema agreement on table creation.
	at org.apache.cassandra.config.CFMetaData$Serializer.deserialize(CFMetaData.java:1517) ~[apache-cassandra-3.11.2.jar:3.11.2]
	at org.apache.cassandra.db.ReadCommand$Serializer.deserialize(ReadCommand.java:744) ~[apache-cassandra-3.11.2.jar:3.11.2]
	at org.apache.cassandra.db.ReadCommand$Serializer.deserialize(ReadCommand.java:683) ~[apache-cassandra-3.11.2.jar:3.11.2]
	at org.apache.cassandra.io.ForwardingVersionedSerializer.deserialize(ForwardingVersionedSerializer.java:50) ~[apache-cassandra-3.11.2.jar:3.11.2]
	at org.apache.cassandra.net.MessageIn.read(MessageIn.java:123) ~[apache-cassandra-3.11.2.jar:3.11.2]
	at org.apache.cassandra.net.IncomingTcpConnection.receiveMessage(IncomingTcpConnection.java:192) ~[apache-cassandra-3.11.2.jar:3.11.2]
	at org.apache.cassandra.net.IncomingTcpConnection.receiveMessages(IncomingTcpConnection.java:180) ~[apache-cassandra-3.11.2.jar:3.11.2]
	at org.apache.cassandra.net.IncomingTcpConnection.run(IncomingTcpConnection.java:94) ~[apache-cassandra-3.11.2.jar:3.11.2]
INFO  [MigrationStage:1] 2018-08-30 15:28:28,413 ColumnFamilyStore.java:411 - Initializing gundeck.meta
INFO  [MigrationStage:1] 2018-08-30 15:28:29,400 ColumnFamilyStore.java:411 - Initializing gundeck.user_push
ERROR [MigrationStage:1] 2018-08-30 15:28:30,107 CassandraDaemon.java:228 - Exception in thread Thread[MigrationStage:1,5,main]
java.lang.AssertionError: Table gundeck.push did not have any partition key columns in the schema tables
	at org.apache.cassandra.schema.SchemaKeyspace.fetchTable(SchemaKeyspace.java:1042) ~[apache-cassandra-3.11.2.jar:3.11.2]
	at org.apache.cassandra.schema.SchemaKeyspace.fetchTables(SchemaKeyspace.java:998) ~[apache-cassandra-3.11.2.jar:3.11.2]
	at org.apache.cassandra.schema.SchemaKeyspace.fetchKeyspace(SchemaKeyspace.java:957) ~[apache-cassandra-3.11.2.jar:3.11.2]
	at org.apache.cassandra.schema.SchemaKeyspace.fetchKeyspacesOnly(SchemaKeyspace.java:949) ~[apache-cassandra-3.11.2.jar:3.11.2]
	at org.apache.cassandra.schema.SchemaKeyspace.mergeSchema(SchemaKeyspace.java:1387) ~[apache-cassandra-3.11.2.jar:3.11.2]
	at org.apache.cassandra.schema.SchemaKeyspace.mergeSchemaAndAnnounceVersion(SchemaKeyspace.java:1366) ~[apache-cassandra-3.11.2.jar:3.11.2]
  • Loading branch information
jschaul authored Sep 5, 2018
1 parent 85368fb commit f009480
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 6 deletions.
2 changes: 2 additions & 0 deletions libs/cassandra-util/cassandra-util.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ library
, transformers >= 0.3
, tinylog >= 0.7
, wreq >= 0.2
, uuid
, retry

74 changes: 68 additions & 6 deletions libs/cassandra-util/src/Cassandra/Schema.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Cassandra.Schema
( Migration (..)
Expand All @@ -18,11 +19,13 @@ module Cassandra.Schema
) where

import Cassandra
import Cassandra.Settings
import Control.Applicative
import Control.Monad.Catch
import Control.Error
import Control.Monad
import Control.Monad.Catch
import Control.Monad.IO.Class
import Control.Retry
import Data.Aeson
import Data.Int
import Data.IORef
Expand All @@ -34,15 +37,17 @@ import Data.Text (Text, pack, intercalate)
import Data.Text.Lazy (fromStrict)
import Data.Text.Lazy.Builder (fromText, fromString, toLazyText)
import Data.Time.Clock
import Data.UUID (UUID)
import Data.Word
import Database.CQL.IO
import Database.CQL.Protocol (Request (..), Query (..), Response(..), Result (..))
import GHC.Generics hiding (to, from, S)
import GHC.Generics hiding (to, from, S, R)
import Options.Applicative hiding (info)
import Prelude hiding (log)
import System.Logger (Logger, Level (..), log, msg)

import qualified Data.Text.Lazy as LT
import qualified Data.List.NonEmpty as NonEmpty

data Migration = Migration
{ migVersion :: Int32
Expand Down Expand Up @@ -141,12 +146,24 @@ useKeyspace (Keyspace k) = do

migrateSchema :: Logger -> MigrationOpts -> [Migration] -> IO ()
migrateSchema l o ms = do
-- if migHost is a DNS name, resolve it and connect to all nodes
hosts <- initialContactsDNS $ pack (migHost o)
p <- Database.CQL.IO.init l $
setContacts (migHost o) []
setContacts (NonEmpty.head hosts) (NonEmpty.tail hosts)
. setPortNumber (fromIntegral $ migPort o)
. setMaxConnections 1
. setPoolStripes 1
-- 'migrationPolicy' ensures we only talk to one host for all queries
-- required for correct functioning of 'waitForSchemaConsistency'
. setPolicy migrationPolicy
-- use higher timeouts on schema migrations to reduce the probability
-- of a timeout happening during 'migAction' or 'metaInsert',
-- as that can lead to a state where schema migrations cannot be re-run
-- without manual action.
-- (due to e.g. "cannot create table X, already exists" errors)
. setConnectTimeout 20
. setSendTimeout 20
. setResponseTimeout 50
. setProtocolVersion V3
$ defSettings
runClient p $ do
Expand All @@ -166,6 +183,9 @@ migrateSchema l o ms = do
migAction
now <- liftIO getCurrentTime
write metaInsert (params All (migVersion, migText, now))
info "Waiting for schema version consistency across peers..."
waitForSchemaConsistency
info "... done waiting."
where
newer v = dropWhile (maybe (const False) (>=) v . migVersion)
. sortBy (\x y -> migVersion x `compare` migVersion y)
Expand All @@ -182,6 +202,48 @@ migrateSchema l o ms = do
metaInsert :: QueryString W (Int32, Text, UTCTime) ()
metaInsert = "insert into meta (id, version, descr, date) values (1,?,?,?)"

-- | Retrieve and compare local and peer system schema versions.
-- if they don't match, retry once per second for 30 seconds
waitForSchemaConsistency :: Client ()
waitForSchemaConsistency = do
void $ retryWhileN 30 inDisagreement getSystemVersions
where
getSystemVersions :: Client (UUID, [UUID])
getSystemVersions = do
-- These two sub-queries must be made to the same node.
-- (comparing local from node A and peers from node B wouldn't be correct)
-- using the custom 'migrationPolicy' when connecting to cassandra ensures this.
local <- systemLocalVersion
peers <- systemPeerVersions
case local of
Just localVersion -> return $ (localVersion, peers)
Nothing -> error "No system_version in system.local (should never happen)"

inDisagreement :: (UUID, [UUID]) -> Bool
inDisagreement (localVersion, peers) = not $ all (== localVersion) peers

systemLocalVersion :: Client (Maybe UUID)
systemLocalVersion = fmap runIdentity <$> qry
where
qry = retry x1 (query1 cql (params One ()))

cql :: PrepQuery R () (Identity UUID)
cql = "select schema_version from system.local"

systemPeerVersions :: Client [UUID]
systemPeerVersions = fmap runIdentity <$> qry
where
qry = retry x1 (query cql (params One ()))

cql :: PrepQuery R () (Identity UUID)
cql = "select schema_version from system.peers"

retryWhileN :: (MonadIO m) => Int -> (a -> Bool) -> m a -> m a
retryWhileN n f m = retrying (constantDelay 1000000 <> limitRetries n)
(const (return . f))
(const m)

-- | The migrationPolicy selects only one and always the same host
migrationPolicy :: IO Policy
migrationPolicy = do
h <- newIORef Nothing
Expand Down

0 comments on commit f009480

Please sign in to comment.