Automatically Delete Old Data From Druid

November 25, 2017

We use the open source metrics data store, Druid, for several of our metrics collection needs at work. The nature of our runtime environment requires us to delete old data from Druid’s deep storage (Hadoop/HDFS in our case).

This use case seems to be fairly common and every database handles it differently. Elasticsearch uses a separate curator program. We run scripts from jobs to accomplish it in MySQL. If memory serves, Influx has the functionality built in. Druid also has it built in but it was surprisingly difficult to get working. Here is how I eventually accomplished it.

Ingredients

  • Druid 0.10.1
  • Red Hat Enterprise Linux Server release 6.9
  • MySQL 5.6.35
  • Hadoop 2.7.2
  • Zookeeper 3.4.6
  • Kafka 0.10.0.1

Requirement 1 - Coordinator Rules

Druid requires an external database to store its metadata. In this database, there is a table called druid_segments. In that table, there is a column called used. If a segment of data is to be destroyed, that value must be set to 0. Since segments are created dynamically, we need an automated way to set these values. This is done via Coordinator Rules.

It is suggested in the documentation that these rules be configured via the Coordinator Console (as opposed to the HTTP endpoints included in the coordinator). Follow these steps to create the rules for a given datasource:

Note: The Coordinator Console I’m referring to here is reached via the coordinator IP address and port. I am not referring to the “Coordinator Console” that can be reached via the overlord IP address and port (aka Indexer Coordinator Console). I know, clear as mud.

  1. Point your browser to http://<coordinator host or ip>:<coordinator port>.
  2. Click the datasources button at the top of the page.
  3. Click the pencil icon next to the target data source on the left of the page (e.g. test.example.metrics.blah). A modal will be displayed that will allow you to create the rules.
  4. Put your name in the “Who is making this change?” input field and place a required comment in the text area.
  5. Click the add a rule button. The first rule should be set to the Load Period that you want to keep data around for. An example period for this rule is P1W for one week of data.
  6. Click the + _default_tier replicant button. The _default_tier can be left with a value of 1.
  7. Click the add a rule button again. The second rule should be Drop Forever.
  8. Click the Save all rules button.

That’s it for the rule setup. If you have any default rules setup, they will be evaluated after the individual datasource rules are looked at. That means if a segment matches an individual rule, it will not be checked against any default rules. Rule order is very important!

Requirement 2 - Coordinator Operation Configuration

The load and drop rules described above will remove old data from the Druid cluster (i.e. the data will not be returned in queries). The data still exists in deep storage on disk, however. To solve this “problem” we must run kill tasks.

The challenge with kill tasks is that they are run via POST requests and each require a specific interval to be given. We could solve that with a script and a cron job but there is a better way. The coordinator can be configured to create kill tasks automagically when it finds segments that are not used. Within the coordinator’s runtime.properties file, add the following options:

druid.coordinator.kill.on=true
druid.coordinator.kill.period=PT12H
druid.coordinator.kill.durationToRetain=P7D
druid.coordinator.kill.maxSegments=100

The options are fairly self explanatory. Key points to note from the documentation if druid.coordinator.kill.on is set to true:

  • druid.coordinator.kill.period defaults to 1 day and must be greater than druid.coordinator.period.indexingPeriod which defaults to 30 minutes
  • druid.coordinator.kill.durationToRetain and druid.coordinator.kill.maxSegments must be configured as their default values are invalid.

Requirement 3 - Coordinator Dynamic Configuration

Druid wants to make sure that when it deletes data, you really want it deleted. As such, once the coordinator operation is configured correctly and the drop rules are in place, we still need to add data sources to the killDataSourceWhitelist. To accomplish this:

  1. Point your browser to http://<coordinator host or ip>:<coordinator port>.
  2. Click on the pencil icon next to the druid cluster name in the top left corner of the page. If you don’t see the icon, make sure you’re on the cluster page of the console.
  3. Put your name in the “Who is making this change?” input field and place a required comment in the text area.
  4. Enter the data source name into the killDataSourceWhitelist field (e.g. test.example.metrics.blah). Note: This is the same name we made the rule for in Requirement 1.
  5. Click Save.

There is a killAllDataSources field in the configuration modal that can be set to true if you want all data to be deleted based on the drop rules set for each data source. Make sure you understand the implications of setting this value before you do it. If a drop rule is created incorrectly and it isn’t caught in time, all of your data may be destroyed. I believe this field was recently added to Druid in version 0.10. Also, if the killAllDataSources field is set to true, the killDataSourceWhitelist field must be empty.

Some Data Still Exists In Deep Storage

I’m not sure why, but after my coordinator ran a few times to clean out all of my old test data, I noticed some old segments hung around in Hadoop. The rows were gone from the druid_segments table in Druid’s metadata store but the segments were still in deep storage.

If you want to delete the lingering data, run the following command on one of the Hadoop nodes:

$ hdfs dfs -rm -R hdfs://<namenode host or ip>:<namenode rpc port><path-to-segment-to-delete>

Wildcards can be used in the segment path. If you have hourly segments being created for example, and you want to delete all of the segments for a given day, your path might look something like /druid/segments/test.example.metrics.blah/20171010T*.

A convenient way to browse your data in Hadoop and confirm all of this is working is by:

  1. Pointing your browser to http://<namenode host or ip>:<namenode http port>. The default http port is 50070.
  2. Click on Utilities in the top navigation bar.
  3. Click on Browse the file system

If you see an error message on the page that says something like: Operation category READ is not supported in state standby. That likely means the Hadoop namenode is in standby mode and must be activated with a command similar to:

$ hdfs --config <configuration location> haadmin -transitionToActive <name of namenode>

Done

That’s all there is to it. Easy peasy, right? If you said yes, you know more about Druid than me. If you answered no, I hope this article has helped make your day a little better.