Skip to content

Clickhouse

Jaafar El Harouchi edited this page Apr 16, 2019 · 1 revision

Server

endpoint12.a4g.com

Flow

Definition

-- Final Tables

CREATE DATABASE IF NOT EXISTS BidRequest;

CREATE TABLE IF NOT EXISTS BidRequest.mediaPlanning
(
    event_date  Date,
    ssp         String,
    siteAppName String,
    country     FixedString(3),
    requestType String,
    conType     UInt8,
    deviceOs    String,
    deviceType  UInt8,
    carrier     String,
    siteAppId   String,
    isApp       UInt8,
    browser     String,
    language    FixedString(2),
    iab         String,
    size        String,
    minCPM      AggregateFunction(avg, Float32),
    MTOTAL      UInt64,
    lastUpdated AggregateFunction(max, DateTime)
)
    engine = SummingMergeTree()
        PARTITION BY toYYYYMMDD(event_date)
        ORDER BY (event_date, country, requestType, conType, isApp, deviceOs, deviceType,
                  carrier, browser, size, ssp, siteAppName, siteAppId, language, iab);

CREATE TABLE IF NOT EXISTS BidRequest.statsByDay
(
    event_date  Date,
    server      String,
    ssp         String,
    visits      UInt64,
    lastUpdated AggregateFunction(max, DateTime)
)
    engine = SummingMergeTree()
        PARTITION BY toYYYYMM(event_date)
        ORDER BY (event_date, server, ssp);


CREATE TABLE IF NOT EXISTS BidRequest.statsByDay
(
    event_date  Date,
    ssp         String,
    server      String,
    visits      UInt64,
    lastUpdated AggregateFunction(max, DateTime)
)
    engine = SummingMergeTree()
        PARTITION BY toYYYYMM(event_date)
        ORDER BY (event_date, server, ssp);


CREATE DATABASE if not exists dmp;


CREATE TABLE IF NOT EXISTS dmp.users
(
    country     FixedString(3),
    userId      String,
    conType     UInt8,
    carrier     String,
    gender      AggregateFunction(max, FixedString(1)),
    yob         AggregateFunction(max, UInt16),
    visits      UInt64,
    created     AggregateFunction(min, datetime),
    lastUpdated AggregateFunction(max, datetime)
) ENGINE = SummingMergeTree()
      PARTITION BY substring(country, 1, 1)
      ORDER BY (country, userId, conType, carrier);

CREATE TABLE IF NOT EXISTS dmp.interests
(
    country     FixedString(3),
    iab         String,
    userId      String,
    visits      UInt64,
    lastUpdated AggregateFunction(max, datetime)
) ENGINE = SummingMergeTree()
      PARTITION BY substring(country, 1, 1)
      ORDER BY (country, iab, userId);


-- Incoming Kafka Queues

CREATE TABLE IF NOT EXISTS BidRequest.usQueue
(
    id             String,
    event_datetime DateTime,
    ssp            String,
    siteAppName    String,
    country        FixedString(3),
    requestType    String,
    conType        UInt8,
    deviceOs       String,
    deviceType     UInt8,
    carrier        String,
    siteAppId      String,
    isApp          UInt8,
    server         String,
    userId         String,
    lat            Float32,
    long           Float32,
    browser        String,
    language       FixedString(2),
    iab            String,
    size           String,
    minCPM         Float32,
    gender         FixedString(1),
    yob            UInt16,
    url            String
)
    engine = Kafka
        SETTINGS kafka_broker_list = '192.168.6.27:9092,192.168.6.28:9092,192.168.6.29:9092',
            kafka_topic_list = 'dsp-BidRequests',
            kafka_group_name = 'clickhouse-BidRequests',
            kafka_skip_broken_messages = 100000,
            kafka_format = 'TSV',
            kafka_row_delimiter = '\n',
            kafka_num_consumers = 11,
            kafka_max_block_size = 131072;

CREATE TABLE IF NOT EXISTS BidRequest.euQueue
(
    id             String,
    event_datetime DateTime,
    ssp            String,
    siteAppName    String,
    country        FixedString(3),
    requestType    String,
    conType        UInt8,
    deviceOs       String,
    deviceType     UInt8,
    carrier        String,
    siteAppId      String,
    isApp          UInt8,
    server         String,
    userId         String,
    lat            Float32,
    long           Float32,
    browser        String,
    language       FixedString(2),
    iab            String,
    size           String,
    minCPM         Float32,
    gender         FixedString(1),
    yob            UInt16,
    url            String
)
    engine = Kafka
        SETTINGS kafka_broker_list = '195.181.167.158:9092,195.181.167.142:9092,195.181.167.140:9092',
            kafka_topic_list = 'dsp-BidRequests',
            kafka_group_name = 'clickhouse-BidRequests',
            kafka_skip_broken_messages = 10000,
            kafka_format = 'TSV',
            kafka_row_delimiter = '\n',
            kafka_num_consumers = 3
--,kafka_max_block_size = 32768
;


-- Materialized Views : Processors


CREATE MATERIALIZED VIEW IF NOT EXISTS BidRequest.usTraffic
    (id String,
     event_datetime DateTime,
     ssp String,
     siteAppName String,
     country FixedString(3),
     requestType String,
     conType UInt8,
     deviceOs String,
     deviceType UInt8,
     carrier String,
     siteAppId String,
     isApp UInt8,
     server String,
     userId String,
     browser String,
     language FixedString(2),
     iab String,
     size String,
     minCPM Float32,
     gender FixedString(1),
     yob UInt16,
     url String)
    ENGINE = MergeTree()
        PARTITION BY toYYYYMMDD(event_datetime)
        ORDER BY (event_datetime,
                  ssp,
                  siteAppName,
                  country,
                  requestType,
                  conType,
                  deviceOs,
                  deviceType,
                  carrier,
                  siteAppId,
                  server,
                  userId,
                  browser,
                  language,
                  iab,
                  size,
                  minCPM,
                  gender,
                  yob,
                  url)
AS
SELECT DISTINCT id,
                event_datetime,
                ssp,
                siteAppName,
                country,
                requestType,
                conType,
                deviceOs,
                deviceType,
                carrier,
                siteAppId,
                isApp,
                server,
                userId,
                browser,
                language,
                iab,
                size,
                minCPM,
                gender,
                yob,
                url
FROM BidRequest.usQueue;

CREATE MATERIALIZED VIEW IF NOT EXISTS BidRequest.euTraffic
    (id String,
     event_datetime DateTime,
     ssp String,
     siteAppName String,
     country FixedString(3),
     requestType String,
     conType UInt8,
     deviceOs String,
     deviceType UInt8,
     carrier String,
     siteAppId String,
     isApp UInt8,
     server String,
     userId String,
     browser String,
     language FixedString(2),
     iab String,
     size String,
     minCPM Float32,
     gender FixedString(1),
     yob UInt16,
     url String)
    ENGINE = MergeTree()
        PARTITION BY toYYYYMMDD(event_datetime)
        ORDER BY (event_datetime,
                  ssp,
                  siteAppName,
                  country,
                  requestType,
                  conType,
                  deviceOs,
                  deviceType,
                  carrier,
                  siteAppId,
                  server,
                  userId,
                  browser,
                  language,
                  iab,
                  size,
                  minCPM,
                  gender,
                  yob,
                  url)
AS
SELECT DISTINCT id,
                event_datetime,
                ssp,
                siteAppName,
                country,
                requestType,
                conType,
                deviceOs,
                deviceType,
                carrier,
                siteAppId,
                isApp,
                server,
                userId,
                browser,
                language,
                iab,
                size,
                minCPM,
                gender,
                yob,
                url
FROM BidRequest.euQueue;

CREATE MATERIALIZED VIEW IF NOT EXISTS BidRequest.usMediaPlanning
    TO BidRequest.mediaPlanning
AS
SELECT toDate(event_datetime)   AS event_date,
       ssp,
       siteAppName,
       country,
       requestType,
       conType,
       deviceOs,
       deviceType,
       carrier,
       siteAppId,
       isApp,
       browser,
       language,
       iab,
       size,
       avgState(minCPM)         AS minCPM,
       count()                  AS MTOTAL,
       maxState(event_datetime) AS lastUpdated
FROM BidRequest.usTraffic
GROUP BY event_date, ssp, siteAppName, country, requestType, conType, deviceOs, deviceType, carrier, siteAppId, isApp,
         browser, language, size, iab;

CREATE MATERIALIZED VIEW IF NOT EXISTS BidRequest.euMediaPlanning
    TO BidRequest.mediaPlanning
AS
SELECT toDate(event_datetime)   AS event_date,
       ssp,
       siteAppName,
       country,
       requestType,
       conType,
       deviceOs,
       deviceType,
       carrier,
       siteAppId,
       isApp,
       browser,
       language,
       iab,
       size,
       avgState(minCPM)         AS minCPM,
       count()                  AS MTOTAL,
       maxState(event_datetime) AS lastUpdated
FROM BidRequest.euTraffic
GROUP BY event_date, ssp, siteAppName, country, requestType, conType, deviceOs, deviceType, carrier, siteAppId, isApp,
         browser, language, size, iab;


CREATE MATERIALIZED VIEW IF NOT EXISTS BidRequest.usStats
    TO BidRequest.statsByDay
AS
SELECT toDate(event_datetime)   AS event_date,
       server,
       ssp,
       count()                  AS visits,
       maxState(event_datetime) AS lastUpdated
FROM BidRequest.usTraffic
GROUP BY event_date, server, ssp;

CREATE MATERIALIZED VIEW IF NOT EXISTS BidRequest.euStats
    TO BidRequest.statsByDay
AS
SELECT toDate(event_datetime)   AS event_date,
       server,
       ssp,
       count()                  AS visits,
       maxState(event_datetime) AS lastUpdated
FROM BidRequest.euTraffic
GROUP BY event_date, server, ssp;


CREATE MATERIALIZED VIEW IF NOT EXISTS dmp.usUsers
    TO dmp.users
AS
SELECT country,
       userId,
       conType,
       carrier,
       maxState(gender)         as gender,
       maxState(yob)            as yob,
       count()                  as visits,
       minState(event_datetime) as created,
       maxState(event_datetime) as lastUpdated
FROM BidRequest.usTraffic
WHERE requestType != 'popunder'
  AND notEmpty(country)
  AND userId LIKE '___%:__________%'
GROUP BY country, userId, conType, carrier;

CREATE MATERIALIZED VIEW IF NOT EXISTS dmp.euUsers
    TO dmp.users
AS
SELECT country,
       userId,
       conType,
       carrier,
       maxState(gender)         as gender,
       maxState(yob)            as yob,
       count()                  as visits,
       minState(event_datetime) as created,
       maxState(event_datetime) as lastUpdated
FROM BidRequest.euTraffic
WHERE requestType != 'popunder'
  AND notEmpty(country)
  AND userId LIKE '___%:__________%'
GROUP BY country, userId, conType, carrier;

CREATE MATERIALIZED VIEW IF NOT EXISTS dmp.usInterests
    TO dmp.interests
AS
SELECT country,
       arrayJoin(splitByChar(',', iab)) as iab,
       userId,
       count()                          as visits,
       maxState(event_datetime)         as lastUpdated
FROM BidRequest.usTraffic
WHERE notEmpty(iab)
  AND userId LIKE '___%:____________%'
  and country in ('MAR', 'USA')
GROUP BY country, iab, userId
HAVING iab like 'IAB%';

CREATE MATERIALIZED VIEW IF NOT EXISTS dmp.euInterests
    TO dmp.interests
AS
SELECT country,
       arrayJoin(splitByChar(',', iab)) as iab,
       userId,
       count()                          as visits,
       maxState(event_datetime)         as lastUpdated
FROM BidRequest.euTraffic
WHERE notEmpty(iab)
  AND userId LIKE '___%:____________%'
  and country in ('MAR', 'USA')
GROUP BY country, iab, userId
HAVING iab like 'IAB%';


-- Helper Views

create view if not exists BidRequest.serverStatus
as
select server, maxMerge(lastUpdated) as lastUpdated
from BidRequest.statsByDay
where server != 'endpoint12'
group by server
order by lastUpdated desc;

CREATE VIEW BidRequest.Top3
as
    select 'SSP' as Field, topKWeighted(3)(ssp, MTOTAL) as Top3
    from BidRequest.mediaPlanning
    union all
    select 'OS' as Field, topKWeighted(3)(deviceOs, MTOTAL) as Top3
    from BidRequest.mediaPlanning
    union all
    select 'Browser' as Field, topKWeighted(3)(browser, MTOTAL) as Top3
    from BidRequest.mediaPlanning
    union all
    select 'Size' as Field, topKWeighted(3)(size, MTOTAL) as Top3
    from BidRequest.mediaPlanning;

CREATE VIEW if not exists dmp.moroccanCarrierUsers AS
select userId,
       carrier,
       maxMerge(gender) as gender,
       maxMerge(yob)    as yob,
       sum(visits)      as visits
from dmp.users
where country = 'MAR'
  and conType = 3
group by userId, carrier
order by userId;


-- Tests

select *
from BidRequest.serverStatus;

select event_date, sum(visits)
from BidRequest.statsByDay
group by event_date
order by event_date desc;

select * from BidRequest.Top3;

select *
from dmp.moroccanCarrierUsers
limit 10;

select topKWeighted(3)(iab, visits) as TopMoroccanInterests
 from dmp.interests
where country = 'MAR';
Clone this wiki locally