Rehmat's Blog

Rehmat Alam
Rehmat Alam

Posted on • Updated on

Efficient Log Management with Logstash and Elasticsearch Streams

I have been running some successful API businesses for the last couple of years where my API services are receiving a large number of API requests. Until very recently, I have been using traditional relational databases to store the logs.

But as my businesses scaled, I faced challenges. Retaining a limited number of API logs per customer is a viable solution, but this is unacceptable at an enterprise scale where you are working with big companies and your customers expect your service to retain logs for a longer time period as well as they need powerful analytical capabilities on the logs of their API calls.

After doing a lot of research, I felt that it was now time to move to a better log management solution. So I chose the ELK stack and I instantly started loving it soon after I dived deep into this amazing software's capabilities. Now we are using the ELK stack for all search-related and log management requirements.

In this post, I am going to explain the design that I have implemented. This blog post is for you if you are facing the same challenges as I did. Apart from the intention to help others, I am writing this as a reference guide for myself for potential future uses.

Let's get started.

Configuring Elasticsearch

In the first step, we need to create an index life cycle management (ILM) policy for our logs index as well and we will configure component templates and index templates that will be used by our logs data stream.

Create an Index Lifecycle Management Policy

We will start by creating an ILM. You can either use Kibana UI by navigating to Stack Management > Index Lifecycle Management or you can use Elasticsearch API (using any HTTP client or using the Kibana developer console). In the Kibana developer console, execute this request to create the ILM:

PUT _ilm/policy/rw_trash_old_logs
{
  "policy": {
    "phases": {
      "hot": {
        "min_age": "0ms",
        "actions": {
          "rollover": {
            "max_size": "50GB",
            "max_age": "30d"
          }
        }
      },
      "delete": {
        "min_age": "90d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

We created a policy called rw_trash_old_logs. When we start indexing our logs to data streams using this policy, the logs will be stored in a single index until the combined logs size reaches 50GB or until the index age reaches 30 days. After that, the logs will be stored in another index. When using data streams, you don't need to worry about indices because Elasticsearch handles this automatically for you and you can query all logs using the data stream name without specifying index names.

We are also deleting the logs when they are 90 days old ensuring that we don't end up consuming a lot of storage and other compute resources by storing logs indefinitely.

The beauty of this design is that you don't need to worry about the manual deletion of logs. You can specify multiple phases in your policy, but for my API business use case, the above policy is good enough. ILM policies can be used to perform more actions like moving the logs to a less expensive index (or storage solution) and so on.

Creating Component Templates

Now we will create two component templates. Although everything can be placed in a single template, for the reusability of the templates, I always prefer to create multiple component templates that do a single job. As a Python developer, I always try to follow the SOLID principles and by creating two component templates here, I pretend to follow the SOLID rule of Single Responsibility Principle (SRP) 😉

Let's create the component templates:

PUT /_component_template/logs-rw-default-mappings
{
  "template": {
    "mappings": {
    "dynamic": false,
    "properties": {
      "apikey_hash": { "type": "keyword" },
      "payload_json": { "type": "object" },
      "status_code": { "type": "integer" },
      "request_uri": { "type": "text" },
      "latency": { "type": "float" },
      "request_id": { "type": "keyword" },
      "@timestamp": {
        "type":   "date",
        "format": "strict_date_optional_time_nanos||epoch_millis"
      },
      "is_billed": { "type": "boolean" },
      "client_ip": { "type": "ip" },
      "ua_string": { "type": "text" },
      "consumer_id": { "type": "keyword" }
    }
  }
  }
}
Enter fullscreen mode Exit fullscreen mode

The above component template defines the mappings of the logs index.

Now let's create the next component template:

PUT _component_template/logs-rw-default-settings
{
  "template": {
    "settings": {
      "index.lifecycle.name": "rw_trash_old_logs"
    }
  },
  "_meta": {
    "description": "Settings for ILM"
  }
}
Enter fullscreen mode Exit fullscreen mode

This component template defines the settings for the logs index so the logs index will use our defined ILM policy. You may need to optimize the settings under some very high load scenarios, but the default settings are generally sufficient.

And now let's our combined final index template that will use the above component templates:

PUT _index_template/logs-rw-default
{
  "index_patterns": ["logs-rw-default*"],
  "data_stream": { },
  "composed_of": [ "logs-rw-default-mappings", "logs-rw-default-settings" ],
  "priority": 500,
  "_meta": {
    "description": "Template for our logs time series data"
  }
}
Enter fullscreen mode Exit fullscreen mode

The above index template will be used when we will push our logs to an index that starts with logs-rw-default. The priority of 500 ensures that if multiple index templates are matched for an incoming index request, this template gets priority.

Now let's try indexing some data:

POST logs-rw-default/_doc
{
  "apikey_hash": "sha-256-hash-of-api-key",
  "payload_json": {
    "search_query": "data scientists at apple",
    "location_country": "us",
    "followers_gt": 100
  },
  "status_code": 200,
  "request_uri": "/api/v3/person-details",
  "latency": 230,
  "request_id": "request-uuid-4",
  "@timestamp": "2023-10-21T21:13:57.000Z",
  "is_billed": true,
  "client_ip": "192.168.1.1",
  "ua_string": "ReactorNetty/1.1.7",
  "consumer_id": "uuid-4-id-of-customer"
}
Enter fullscreen mode Exit fullscreen mode

And the response will be something like this:

{
  "_index": ".ds-logs-rw-default-2023.11.05-000001",
  "_id": "og5Vn4sBbA2Haaanu5Ws",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "_seq_no": 0,
  "_primary_term": 1
}
Enter fullscreen mode Exit fullscreen mode

You can notice that the index name is dynamically allocated. It means that our defined templates and the ILM policies are working perfectly 👍

Configuring Logstash

So far, we have done our configuration at the Elasticsearch level. Now let's configure Logstash. For my API services, I use the Kong API gateway, so this Logstash configuration is specifically aimed at ingesting logs from the Kong API gateway.

Here is the Logstash configuration that I use:

input {
  http {
    port => 8080
    codec => "json"
  }
}

filter {
  fingerprint {
    source => "api_key"
    target => "apikey_hash"
    method => "SHA256"
  }

  if [request_payload] {
    json {
      source => "request_payload"
      target => "payload_json"
    }
  }

  mutate {
    add_field => {
      "status_code" => "%{[response][status]}"
      "latency" => "%{[latencies][proxy]}"
      "request_uri" => "%{[request][uri]}"
      "request_method" => "%{[request][method]}"
      "ua_string" => "%{[request][headers][user-agent]}"
      "request_id" => "%{[response][headers][api-request-id]}"
    }
    convert => {
      "status_code" => "integer"
    }
  }

  date {
    match => ["started_at", "UNIX_MS"]
    target => "@timestamp"
    timezone => "UTC"
  }

  mutate {
    convert => {
      "is_billed" => "boolean"
    }
  }

  if [consumer] {
    mutate {
        add_field => {
          "consumer_id" => "%{[consumer][id]}"
        }
    }
  } else {
    if [authenticated_entity] {
      mutate {
        add_field => {
            "consumer_id" => "%{[authenticated_entity][id]}"
        }
      }
    } else {
      mutate {
        add_field => {
          "consumer_id" => "null"
        }
      }
    }
  }

  prune {
    whitelist_names => [
      "^apikey_hash$",
      "^payload_json$",
      "^request_method$",
      "^status_code$",
      "^request_uri$",
      "^latency$",
      "^request_id$",
      "^@timestamp$",
      "^is_billed$",
      "^client_ip$",
      "^ua_string$",
      "^consumer_id$"
    ]
  }
}

output {
    elasticsearch {
      hosts => ["${ELASTIC_HOSTS}"]
      data_stream => "true"
      data_stream_type => "logs"
      data_stream_dataset => "rw"
      data_stream_namespace => "default"
      user => "${ELASTIC_USER}"
      password => "${ELASTIC_PASSWORD}"
      ssl => true
      cacert => "/usr/share/logstash/config/certs/ca/ca.crt"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

When we start Logstash using this configuration, Logstash will use an HTTP input plugin. It will listen on port 8080 for incoming HTTP requests. We configure the Logstash endpoint with port 8080 as the destination URL in the Kong HTTP logs plugin. When Kong posts the log data to the Logstash endpoint, Logstash performs some mutations to excluded undeeded data and then it posts the formatted JSON data to Elasticsearch as a data stream.

That's it. Onward, you can use Kibana to visualize and analyze logs. With this robust logs storage solution implemented, now you have unlimited possibilities when it comes to analyzing the API calls made at your API services.

I hope this article helped you understand how to use ELK stack to manage logs efficiently. As this is my first implementation of ELK for log management, there might be a lot of further improvement. You can share your thoughts on this. Feel free to leave a comment if you have something to discuss.

Top comments (0)