Skip to content

Commit

Permalink
add numparts 17 initial load and batch size 10mil
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 21, 2024
1 parent 8a4fe91 commit 0cbb0df
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
2 changes: 2 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,8 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
last *protos.QRepPartition,
runUUID string,
) (*protos.QRepParitionResult, error) {
// override for ottimate(plateiq)
config.NumRowsPerPartition = max(config.NumRowsPerPartition, 10000000)
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
err := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, nil, config.ParentMirrorName)
if err != nil {
Expand Down
19 changes: 13 additions & 6 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,20 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(
if creds.AWS.SessionToken != "" {
sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken)
}
query := fmt.Sprintf("INSERT INTO `%s`(%s) SELECT %s FROM s3('%s','%s','%s'%s, 'Avro')",
config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl,
creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart)

if err := s.connector.database.Exec(ctx, query); err != nil {
s.connector.logger.Error("Failed to insert into select for ClickHouse", slog.Any("error", err))
return 0, err
hashColName := dstTableSchema[0].Name()
numParts := 17
for i := 0; i < numParts; i++ {
whereClause := fmt.Sprintf("cityHash64(%s) %% %d = %d", hashColName, numParts, i)
query := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM s3('%s','%s','%s'%s, 'Avro') WHERE %s SETTINGS throw_on_max_partitions_per_insert_block = 0",
config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl,
creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart, whereClause)
s.connector.logger.Info("running query", slog.String("query", query), slog.Int("part", i), slog.Int("numParts", numParts))
err := s.connector.database.Exec(ctx, query)
if err != nil {
s.connector.logger.Error("failed to execute query", slog.String("query", query), slog.Int("part", i), slog.Int("numParts", numParts), slog.Any("error", err))
return 0, err
}
}

return avroFile.NumRecords, nil
Expand Down

0 comments on commit 0cbb0df

Please sign in to comment.