Working with Materialized Views in Clickhouse
⏱ Время чтения текста – 7 минутThis time we’ll illustrate how you can pass data on Facebook ad campaigns to Clickhouse tables with Python and implement Materialized Views. What is materialized views, you may ask. Oftentimes Clickhouse is used to handle large amounts of data and the time spent waiting for a response from a table with raw data is constantly increasing. Usually, we would use ETL-process to address this task efficiently or create aggregate tables, which are not that useful because we have to regularly update them. Clickhouse system offers a new way to meet the challenge using materialized views.
Materialized Views allow us to store and update data on a hard drive in line with the SELECT query that was used to get a view. When we need to insert data into a table, the SELECT method transforms our data and populates a materialized view.
Setting Up Amazon EC2 instance
We need to connect our Python script that we created in this article to Cickhouse. The script will make queries, so let’s open several ports. In your AWS Dashboard go to Network & Security — Security Groups. Our instance belongs to the launch-wizard-1 group. Сlick it and pay attention to the Inbound rules, you need to set them as shown in this screenshot:
Setting up Clickhouse
It’s time to set up Clickhouse. Let’s edit the config.xml file using nano text editor:
cd /etc/clickhouse-server
sudo nano config.xml
Learn more about the shortcuts here if you didn’t get how to exit nano too :)
Uncomment this line:
<listen_host>0.0.0.0</listen_host>
to access your database from any IP-address:
Create a table and its materialized view
Open a terminal window to create our database with tables:
CREATE DATABASE db1
USE db1
We’ll refer to the same example of data collection from Facebook. The data on Ad Campaigns may often change and be updated, with this in mind we want to create a materialized view that would automatically update aggregate tables containing the costs data. Our Clickhouse table will look almost the same as the DataFrame used in the previous post. We picked ReplacingMergeTree as an engine for our table, it will remove duplicates by sorting key:
CREATE TABLE facebook_insights(
campaign_id UInt64,
clicks UInt32,
spend Float32,
impressions UInt32,
date_start Date,
date_stop Date,
sign Int8
) ENGINE = ReplacingMergeTree
ORDER BY (date_start, date_stop)
And then, create a materialized view:
CREATE MATERIALIZED VIEW fb_aggregated
ENGINE = SummingMergeTree()
ORDER BY date_start
AS
SELECT campaign_id,
date_start,
sum(spend * sign) as spent,
sum(impressions * sign) as impressions,
sum(clicks * sign) as clicks
FROM facebook_insights
GROUP BY date_start, campaign_id
More details are available in the Clickhouse blog.
Unfortunately for us, Clikhouse system doesn’t include a familiar UPDATE method. So we need to find a workaround. Thanks to the Yandex team, these guys offered to insert rows with a negative sign first, and then use sign for reversing. According to this principle, the old data will be ignored when summing.
Script
Let’s start writing the script and import a new library, which is called clickhouse_driver. It allows to make queries to Clickhouse in Python:
We are using the updated version of the script from “Collecting Data on Facebook Ad Campaigns”. But it will work fine if you just combine this code with the previous one.
from datetime import datetime, timedelta
from clickhouse_driver import Client
from clickhouse_driver import errors
An object of the Clientclass enables us to make queries with the execute() method. Type in your public DNS in the host field, port – 9000, specify default as a user, and a databasefor the connection.
client = Client(host='ec1-2-34-56-78.us-east-2.compute.amazonaws.com', user='default', password=' ', port='9000', database='db1')
To ensure that everything works as expected, we need to write the following query that will print out names of all databases stored on the server:
client.execute('SHOW DATABASES')
In case of success the query will return this list:
[('_temporary_and_external_tables',), ('db1',), ('default',), ('system',)]
For example, we want to get data for the past three days. Create several datetime objects with the datetime library and convert them to strings using thestrftime()
method:
date_start = datetime.now() - timedelta(days=3)
date_end = datetime.now() - timedelta(days=1)
date_start_str = date_start.strftime("%Y-%m-%d")
date_end_str = date_end.strftime("%Y-%m-%d")
This query returns all table columns for a certain period:
SQL_select = f"select campaign_id, clicks, spend, impressions, date_start, date_stop, sign from facebook_insights where date_start > '{date_start_str}' AND date_start < '{date_end_str}'"
Make a query and pass the data to the old_data_list. And then, replace their signfor -1 and append elements to the new_data_list:
new_data_list = []
old_data_list = []
old_data_list = client.execute(SQL_select)
for elem in old_data_list:
elem = list(elem)
elem[len(elem) - 1] = -1
new_data_list.append(elem)
Finally, write our algorithm: insert the data with the sign =-1, optimize it with ReplacingMergeTree, remove duplicates, and INSERT new data with the sign = 1.
SQL_query = 'INSERT INTO facebook_insights VALUES'
client.execute(SQL_query, new_data_list)
SQL_optimize = "OPTIMIZE TABLE facebook_insights"
client.execute(SQL_optimize)
for i in range(len(insight_campaign_id_list)):
client.execute(SQL_query, [[insight_campaign_id_list[i],
insight_clicks_list[i],
insight_spend_list[i],
insight_impressions_list[i],
datetime.strptime(insight_date_start_list[i], '%Y-%m-%d').date(),
datetime.strptime(insight_date_start_list[i], '%Y-%m-%d').date(),
1]])
client.execute(SQL_optimize)
Get back to Clickhouse and make the next query to view the first 20 rows:
SELECT * FROM facebook_insights LIMIT 20
And SELECT * FROM fb_aggregated LIMIT 20 to compare our materialized view:
Nice work! Now we have a materialized view that will be updated each time when the data in the facebook_insights table changes. The trick with the sign operator allows to differ already processed data and prevent its summation, while ReplacingMergeTree engine helps us to remove duplicates.