-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 introduce automatic migration at the startup of server for docker environment #3980
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@subodh1810 I'm really liking this. I think the main thing is to push some of the last logic behind the persistence interface, specifically the atomic replace. I think it should be the same for both the config persistence and the job persistence.
Just leaving a comment since I'd like to see that part before I approve.
@@ -171,6 +171,8 @@ | |||
*/ | |||
Map<DatabaseSchema, Stream<JsonNode>> exportDatabase() throws IOException; | |||
|
|||
Map<String, Stream<JsonNode>> exportEverythingInDefaultSchema() throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this name scoped by "default schema". I think the contract we are looking for here is that the JobPersistence
is that we can ask it to give us all of its data as an archive. I'm not sure why mentioning the schema is important.
Keep in mind we may want to allow using different databases in the future and hide them behind this interface. I mention that case only because it may make it clearer that mentioning the PG specific schema shouldn't be the caller's concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also i know in our conversations i kept saying exportEverything but probably the more common name fot this method would just be dump
or export
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for dump()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually what is the difference between dump
and exportDatabase
? they seem to be performing the same thing under the hood. related to my comment in DefaultJobPersistence
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main difference between dump
and exportDatabase
is that dump downloads all the tables of all the non-system schemas available in the database while exportDatabase only downloads the tables available in DatabaseSchema.java
@@ -0,0 +1,79 @@ | |||
version: "3.7" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any different between this and the docker-compose file in the root of the project? I'm wondering if there's any sane way to avoid duplicating and just grab the docker-compose file at build time. might be tricky though. not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file explicitly uses the dev image while the docker-compose file uses the {VERSION}
image
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we create an issue to de-deduplicate that docker-compose files? I think we can maybe do some overriding instead of copying and pasting these really big yaml files. i think we should leave it for now, but want to make a note to take another look at it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final Optional<String> airbyteDatabaseVersion = jobPersistence.getVersion(); | ||
|
||
Optional<String> airbyteDatabaseVersion = jobPersistence.getVersion(); | ||
if (airbyteDatabaseVersion.isPresent() && !AirbyteVersion.isCompatible(airbyteVersion, airbyteDatabaseVersion.get())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we should also check for the case where the data is high a version than app. in this case i think we should just fail.
in other words...
data version | app version | action |
---|---|---|
0.24.0 | 0.24.0 | start up |
0.24.0 | 0.24.1 | start up, we assume patch versions are compatible with each other |
0.24.1 | 0.24.0 | start up, we assume patch versions are compatible with each other |
0.22.1 | 0.24.0 | migrate, then startup |
0.24.1 | 0.22.0 | fail loudly don't try to run migration; doesn't make sense |
public static void run(MigrateConfig migrateConfig) throws IOException { | ||
final Path workspaceRoot = Files.createTempDirectory(Path.of("/tmp"), "airbyte_migrate"); | ||
AirbyteVersion airbyteVersion = new AirbyteVersion(migrateConfig.getTargetVersion()); | ||
if (!airbyteVersion.getPatchVersion().equals("0")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yuck. but a good call would it make sense to move this logic into its own method? maybe a static on AirbyteVersion
?
@@ -454,6 +454,22 @@ private static String current_timestamp() { | |||
return exportDatabase(DEFAULT_SCHEMA); | |||
} | |||
|
|||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
putting this comment here because the line of code i want isn't in the diff.
should the import do an atomic replace like we do for the config persistence?
} | ||
} | ||
|
||
public void deleteOrphanDirectories() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
per the conversation we had, i think that the atomic replace in config persistence should handle this for us. ideally we shouldn't need to call this. the db should just handle it.
@@ -0,0 +1,125 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a note the path for this file looks weird. i think there's one directory called "io.airbyte.test.automaticMigrationAcceptance" as opposed to "io/airbyte/test/automaticMigrationAcceptance"
import java.time.Duration; | ||
import java.util.function.Consumer; | ||
|
||
public class ImportApi { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a javadoc comment explaining why this class exists. if i recall correctly it's because the client generated by open api has a bug.
@@ -0,0 +1,62 @@ | |||
version: "3.7" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe put a comment in the top of this file explaining where it came from. i thinks its the docker compose file from 0.14.2 or something, right?
import org.testcontainers.containers.DockerComposeContainer; | ||
import org.testcontainers.containers.output.OutputFrame; | ||
|
||
public class MigrationAcceptanceTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test makes me feel a lot better about this project. i know it was a lot of work to get it working, but it's going to be so valualbe for our reliability.
final Path workspaceRoot = Files.createTempDirectory(Path.of("/tmp"), "airbyte_migrate"); | ||
AirbyteVersion airbyteVersion = new AirbyteVersion(migrateConfig.getTargetVersion()); | ||
if (!airbyteVersion.getPatchVersion().equals("0")) { | ||
String targetVersionWithoutPatch = "" + airbyteVersion.getMajorVersion() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a String.format would make things easier to read.
@@ -171,6 +171,8 @@ | |||
*/ | |||
Map<DatabaseSchema, Stream<JsonNode>> exportDatabase() throws IOException; | |||
|
|||
Map<String, Stream<JsonNode>> exportEverythingInDefaultSchema() throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for dump()
@@ -171,6 +171,8 @@ | |||
*/ | |||
Map<DatabaseSchema, Stream<JsonNode>> exportDatabase() throws IOException; | |||
|
|||
Map<String, Stream<JsonNode>> exportEverythingInDefaultSchema() throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually what is the difference between dump
and exportDatabase
? they seem to be performing the same thing under the hood. related to my comment in DefaultJobPersistence
.
return exportEverything(DEFAULT_SCHEMA); | ||
} | ||
|
||
private Map<String, Stream<JsonNode>> exportEverything(final String schema) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems very similar to exportDatabase
. is this implementation different because we don't want to reference DatabaseSchema.java
since that might be at a later version? if so, I would leave a comment explaining this.
airbyte-server/build.gradle
Outdated
@@ -31,6 +31,7 @@ dependencies { | |||
implementation project(':airbyte-scheduler:models') | |||
implementation project(':airbyte-scheduler:persistence') | |||
implementation project(':airbyte-workers') | |||
implementation project(':airbyte-migration') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sort.
|
||
|
||
ENV_VERSION=$(grep VERSION .env | xargs) | ||
ENV_VERSION=${ENV_VERSION#*=} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we discussed and thought we could avoid adding this file and call the task directly in the gradle file. It didn't work?
import org.testcontainers.containers.PostgreSQLContainer; | ||
import org.testcontainers.utility.MountableFile; | ||
|
||
public class MockAirbyteDB implements AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename this to StubAirbyteDB
since we are actually instantiating something here.
|
||
// TODO: Write a test case which compares the output dump with the output of ArchiveHandler export | ||
// for the same data | ||
public class ConfigDumpExport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a doc string here to explain why this exist separate from the ArchiveHandler's export function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this just replace the archive handler's export function? it seems like we should just prefer this implementation to the existing one in the ArchiveHandler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some users reported they've been storing their own tables/data in the Airbyte Postgres database too.
So this dump would also export their custom tables/schema too? is that going to be a problem in such a use case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
woof. i don't think it'll be a problem for now. but probably something we need to be careful about when we add our postgres version?
import org.testcontainers.containers.DockerComposeContainer; | ||
import org.testcontainers.containers.SocatContainer; | ||
|
||
public class CustomDockerComposeContainer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: doc string here with basic summary of why this class is here. went back and forth on this since we do have enough contextual info a reader will figure it out. ultimately I think it's clearer if we leave a comment here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I really appreciate the effort put into testing here; gives me more confidence nothing unexpected will bite us. General approach looks good to me. Some thoughts on readability + one comment on how we are triggering the migration tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting close!
I think you shouldn't be afraid to make changes to archive handler. I think there are shortcomings of the current implementation (e.g. validation on export doesn't seem necessary, and it is very verbose in that it explicitly names each table, and it has no transactional guarantees for the config db). If we could get a good interface on the ConfigPersistence
, I think I'd be more okay with not tackling the archive handler, but it feels like we're being force to make weird tradeoffs on our persistence interface right now. These are tradeoffs I don't think we want to make because they directly mess up our ability to use other databases in the ConfigPersistence
e.g. postgres.
import org.testcontainers.containers.PostgreSQLContainer; | ||
import org.testcontainers.utility.MountableFile; | ||
|
||
public class StubAirbyteDB implements AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest StubAirbyteDB => AirbyteDBStub. I think it makes bit more sense to have it be a noun than a verbe.
|
||
// TODO: Write a test case which compares the output dump with the output of ArchiveHandler export | ||
// for the same data | ||
public class ConfigDumpExport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this just replace the archive handler's export function? it seems like we should just prefer this implementation to the existing one in the ArchiveHandler.
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class ConfigDumpUtil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we put this behind the ConfigPersistence
interface? remember we are trying to add support for postgres as a ConfigPersistence
. If this isn't behind that interface then we'll have to add a switch statement to handle different database types when exporting. Ideally we'd like to hide this behind the ConfigPersistence
iface so that we don't have to deal with that concern.
@@ -37,4 +37,6 @@ | |||
|
|||
<T> void writeConfig(ConfigSchema configType, String configId, T config) throws JsonValidationException, IOException; | |||
|
|||
void deleteOrphanDirectories() throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConfigPersistence
keep in mind we want to create a PostgresConfigPersistence
. surfacing things about directories in this interface ties the interface specifically to a single implementation (the file system one). I think you can either have a generic cleanUpOldTables
function or something, but it feels kinda awkward to me. That's why I was suggesting the atomic replace here because it solves a transaction problem that already exists with the interface and gets rid of this weirdness. I think it's fine to replace part of the ImportArchive implementation with usage of atomic replace. the super verbose thing we have right now is pretty error prone anyway.
@cgardens new problem |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wahoo! we made it. looks good.
just wanted to double check that you fixed any usages of paths that look for source_definitions.yaml
or destination_definitions.yaml
airbyte-server/seed.Dockerfile
Outdated
# the sole purpose of this image is to seed the data volume with the default data | ||
# that the app should have when it is first installed. | ||
# data is mapped to a volume. Check CONFIG_ROOT in .env file and docker-compoes file | ||
COPY build/resources/main/config latest_seeds/config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new line.
@@ -13,6 +13,9 @@ RUN chmod +x wait | |||
|
|||
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar | |||
|
|||
RUN mkdir latest_seeds | |||
COPY build/resources/main/config latest_seeds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this copy into latest_seeds
and seed.Dockerfile
copies into `latest_seeds/config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seed.Dockerfile is populating the data
volume and we want the data to be present in /data/config
path in the volume. This is just the latest_seeds and is only required for automatic migration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
Need to test this for kube before merging |
private final Path storageRoot; | ||
|
||
public DefaultConfigPersistence(final Path storageRoot) { | ||
this(storageRoot, new JsonSchemaValidator()); | ||
public DefaultConfigPersistence(final Path storageRootConfig) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't rename this argument since it still has the same name in the other ctor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and actually calling it storageRootConfig
makes it sound that it contains the config_dir suffix
} | ||
|
||
@Override | ||
public <T> void atomicConfigImport(Map<ConfigSchema, Stream<T>> configs, boolean dryRun) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface will be clearer if you call it importConfigs
or replaceAllConfigs
.
Right now atomic
doesn't really help understand what happens in the code.
@@ -119,22 +230,24 @@ public DefaultConfigPersistence(final Path storageRoot, final JsonSchemaValidato | |||
} | |||
} | |||
|
|||
private <T> void writeConfigInternal(ConfigSchema configType, String configId, T config) throws JsonValidationException, IOException { | |||
private <T> void writeConfigInternal(ConfigSchema configType, String configId, T config, Optional<Path> rootOverride) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is an anti-pattern to use Optional as argument. It is supposed to be used as a return type instead. You can just pass null
and have an overload without the parameter, it makes the caller site cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually the better thing to do here is to simply make these methods to never use the classe member for the path and only use the root
argument. Maybe they can even be static
if path is the only thing they interact with.
@@ -11,6 +11,9 @@ required: | |||
- catalog | |||
additionalProperties: false | |||
properties: | |||
prefix: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this file changing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We didn't introduce this change back when we wrote this migration. Thats why its required
.isCompatible(airbyteVersion, airbyteDatabaseVersion.get()) | ||
&& !isDatabaseVersionAheadOfAppVersion(airbyteVersion, airbyteDatabaseVersion.get())) { | ||
runAutomaticMigration(configRepository, jobPersistence, airbyteVersion, | ||
airbyteDatabaseVersion.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: can you adjust your IJ max line length to avoid line breaks?
@@ -203,4 +206,12 @@ public void writeStandardSyncOperation(final StandardSyncOperation standardSyncO | |||
return persistence.listConfigs(ConfigSchema.STANDARD_SYNC_OPERATION, StandardSyncOperation.class); | |||
} | |||
|
|||
public <T> void atomicImportOfConfigs(Map<ConfigSchema, Stream<T>> configs, boolean dryRun) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment for the naming
}), | ||
STANDARD_SYNC_SUMMARY("StandardSyncSummary.yaml", StandardSyncSummary.class, | ||
standardSyncSummary -> { | ||
throw new RuntimeException("No Id for StandardSyncSummary exists"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StandardSyncSummary doesn't have an id
|
||
static final Path KNOWN_SCHEMAS_ROOT = JsonSchemas.prepareSchemas("types", ConfigSchema.class); | ||
|
||
private final String schemaFilename; | ||
private final Class<?> className; | ||
private final Function<?, String> extractId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you use ?
instead of keeping the T
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@michel-tricot since ConfigSchema
is an enum, we can not use T
, thats why I had to use ?
return ((Function<T, String>) extractId).apply(object); | ||
} | ||
throw new RuntimeException( | ||
"Object: " + object + " is not instance of class " + getClassName().getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: useless linebreak, just a setting to change in IJ
@@ -37,4 +40,8 @@ | |||
|
|||
<T> void writeConfig(ConfigSchema configType, String configId, T config) throws JsonValidationException, IOException; | |||
|
|||
<T> void atomicConfigImport(Map<ConfigSchema, Stream<T>> configs, boolean dryRun) throws IOException; | |||
|
|||
Map<String, Stream<JsonNode>> dump() throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency with the other methods: dumpConfigs
# Conflicts: # .github/workflows/gradle.yml # docs/operator-guides/upgrading-airbyte.md
I was testing this for kube with help from @davinchia and we came across a bug in kube implementation cause of which automatic migration can't run for kube cause of this issue #4399 AS a result we decided that the right thing to do here would be to disable automatic migration for kube and fix the bug and then enable automatic migration |
3. Switching over to your browser, navigate to the Admin page in the UI. Then go to the Configuration Tab. Click Export. This will download a compressed back-up archive \(gzipped tarball\) of all of your Airbyte configuration data and sync history locally. | ||
## Upgrading \(K8s\) | ||
|
||
This process is similar to the Docker upgrade process with several changes to account for the Kubernetes resources. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This process is similar to the Docker upgrade process with several changes to account for the Kubernetes resources. | |
Airbyte Kubernetes **does not** support automatic migration. Please follow the following steps to upgrade your Airbyte Kubernetes deployment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
The frontend test is failing cause I have changed URL
Since these files are currently not present in master branch, the front end tests fail The kubernetes acceptance test is failing cause of an already known issue in kube acceptance test implementation. For some reason the kube acceptance test doesnt use local dev image for test and instead pulls it |
Issue: #2598
Approach for automatic migration :
User does a
docker-compose down
and then pulls in the latest airbyteUser does a
docker-compose up
, while the server is coming up, it checks whether the airbyte version from.env
is compatible with the airbyte version in the database. If not then we run the automatic migration. Meanwhile, the scheduler continues to wait for the database airbyte version and .env airbyte version to be compatibleWhen the automatic migration starts :
tar.gz
out of this similar to the export method of archive handlertar.gz
file.config_deprecated
and the new folder with the latest data is renamed toconfig
. Finally theconfig_deprecated
is deletedthe database version back to initial version if we had changed it and then start the server with VersionMismatch error.
Testing: We have 2 test classes
Airbyte using version 17 images. We then populate it using the import API using the pre-existing data from v17.
After populating it we shut down but dont remove the volumes. Next we again spawn Airbyte but with the latest version
using the dev images but the version is set the version in
.env
file. When the second time Airbyte is spawned, the serverlooks at the version in database and the new version and it finds that its not compatible thus starting the automatic
migration process. We assert mainly using logs and server health check.
Recommended reading order
Pre-merge Checklist
Expand the checklist which is relevant for this PR.
Connector checklist
airbyte_secret
in output spec./gradlew :airbyte-integrations:connectors:<name>:integrationTest
./test connector=connectors/<name>
command as documented here is passing.docs/integrations/
directory./publish
command described hereConnector Generator checklist
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changes