Ruan Bekker's Blog

From a Curious mind to Posts on Github

Using a SSH Reverse Tunnel to Access Nodes on Private Ranges

ssh-tunneling

Personal utility (actually just a command) that I use to reach my Raspberry Pi Nodes that has no direct route via the Internet

Other Projects

There’s a lot of other tools out there that’s already solving this issue, such as inlets, but I wanted my own, so that I can extend features to it as it pleases me.

Overview

This is more ore less how it looks like:

1
2
3
4
5
6
7
[VPS] <-- Has a Public IP
 |
 |
 [HOME NETWORK] <-- Dynamic IP
   |
   |
 [rpi-01:22], [rpi-02:22] <-- Private IPs
  • SSH Tunnel is setup from the Raspberry Pi Nodes
  • Each Raspberry Pi sets up a unique port on the VPS for the tunnel to traverse to the Rpi on port 22
  • To reach Rpi-01, you hop onto the VPS and ssh to localhost port 2201
  • To reach Rpi-02, you hop onto the VPS and ssh to localhost port 2202, etc

Progress

The tool will still be built, but using ssh it’s quite easy

Usage

Setup the SSH Reverse Tunnel from rpi-01:

1
2
3
4
5
6
$ ssh -i ~/.ssh/bastion.pem \
  -o StrictHostKeyChecking=no \
  -o UserKnownHostsFile=/dev/null \
  -o ServerAliveInterval=60 \
  -N -R 2201:localhost:22 \
  -p 22 ruan@bastion-9239.domain.cloud

Setup the SSH Reverse Tunnel from rpi-02:

1
2
3
4
5
6
$ ssh -i ~/.ssh/bastion.pem \
  -o StrictHostKeyChecking=no \
  -o UserKnownHostsFile=/dev/null \
  -o ServerAliveInterval=60 \
  -N -R 2202:localhost:22 \
  -p 22 ruan@bastion-9239.domain.cloud

On the VPS, we can see that we have port 2021 and 2022 listening:

1
2
3
4
5
$ netstat -tulpn
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp        0      0 127.0.0.1:2201          0.0.0.0:*               LISTEN      -
tcp        0      0 127.0.0.1:2202          0.0.0.0:*               LISTEN      -

To connect to rpi-01, we ssh to localhost on port 2201, from the VPS:

1
2
$ ssh -p 2201 pi@localhost
pi@rpi-01:~ $

To connect to rpi-02, we ssh to localhost on port 2202 from the VPS:

1
2
$ ssh -p 2202 pi@localhost
pi@rpi-02:~ $

Get the Top 10 Items on Hackernews in Python

This is a quick post on how to use python to get the 10 latest items from hackernews:

1
2
3
4
5
6
7
8
9
import requests
import json

def get_top_ten():
    ids = requests.get('https://hacker-news.firebaseio.com/v0/topstories.json?print=pretty').json()[0:10]
    for id in ids:
        postresponse = requests.get('https://hacker-news.firebaseio.com/v0/item/{postid}.json?print=pretty'.format(postid=id)).json()
        formatted = {"title": postresponse["title"], "type": postresponse["type"], "url": postresponse["url"], "by": postresponse["by"]}
        print(json.dumps(formatted, indent=2))

When running it:

1
2
3
4
5
6
7
8
>>> get_top_ten()
..
{
  "title": "Play Counter-Strike 1.6 in your browser",
  "type": "story",
  "url": "http://cs-online.club",
  "by": "m0ck"
}

Improve MySQL Write Performance Using Batch Writes

mysql-python-performance

I am no DBA, but I got curious when I noticed sluggish write performance on a mysql database, and I remembered somewhere that you should always use batch writes over sequential writes. So I decided to test it out, using a python script and a mysql server.

What will we be doing

I wrote a python script that writes 100,000 records to a database and keeps time of how long the writes took, 2 examples which I will compare:

  • One script writing each record to the database
  • One script writing all the records as batch

Sequential Writes

It took 48 seconds to write 100,000 records into a database using sequential writes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
...
for user in userids:
    userid = user["uid"]
    name = user["uid"].split('_')[0]
    job = random.choice(job)
    age = random.randint(24,49)
    credit_card_num = user["ccnum"]
    status = random.choice(["active", "inactive", "disabled"])

    cur.execute(
        """INSERT INTO customers(userid, name, job, age, credit_card_num, status) VALUES(%s, %s, %s, %s, %s, %s)""",
        (userid, name, job, age, credit_card_num, status)
    )
...

Running that shows us this:

1
2
3
4
5
$ python3 mysql_seq_writes.py
start
writing customers to database
finish
inserted 100000 records in 48s

Batch Writes

It took 3 seconds to write to write 100,000 records using batch writes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
...
for user in userids:
    userid = user["uid"]
    name = user["uid"].split('_')[0]
    job = random.choice(job)
    age = random.randint(24,49)
    credit_card_num = user["ccnum"]
    status = random.choice(["active", "inactive", "disabled"])

    bunch_users.append((userid, name, job, age, credit_card_num, status))

cur.executemany(
    """INSERT INTO customers(userid, name, job, age, credit_card_num, status) VALUES(%s, %s, %s, %s, %s, %s)""",
    bunch_users
)
...

Running that shows us this:

1
2
3
4
5
$ python3 mysql_batch_writes.py
start
writing customers to database
finish
inserted 100000 records in 3s

Looking at the Scripts

The script used for sequential writes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import datetime
import random
import MySQLdb
from datetime import datetime as dt

host="172.18.0.1"
user="root"
password="password"
dbname="shopdb"
records=100000

db = MySQLdb.connect(host, user, password, dbname)

names = ['ruan', 'donovan', 'james', 'warren', 'angie', 'nicole', 'jenny', 'penny', 'amber']
job = ['doctor', 'scientist', 'teacher', 'police officer', 'waiter', 'banker', 'it']

cur = db.cursor()
cur.execute("DROP TABLE IF EXISTS customers")
cur.execute("CREATE TABLE customers(userid VARCHAR(50), name VARCHAR(50), surname VARCHAR(50), job VARCHAR(50), age INT(2), credit_card_num VARCHAR(50), status VARCHAR(10))")

bunch_users = []
userids = []

print("start")

def gen_id():
    return str(random.randint(0,9999)).zfill(4)

def gen_user(username):
    ccnum = '{0}-{1}-{2}-{3}'.format(gen_id(), gen_id(), gen_id(), gen_id())
    userid = username + '_' + ccnum.split('-')[0] + ccnum.split('-')[2]
    return {"uid": userid, "ccnum": ccnum}

for name in range(records):
    userids.append(gen_user(random.choice(names)))

print("writing customers to database")

timestart = int(dt.now().strftime("%s"))

for user in userids:
    userid = user["uid"]
    name = user["uid"].split('_')[0]
    job = random.choice(job)
    age = random.randint(24,49)
    credit_card_num = user["ccnum"]
    status = random.choice(["active", "inactive", "disabled"])

    #bunch_users.append((userid, name, job, age, credit_card_num, status))

    cur.execute(
        """INSERT INTO customers(userid, name, job, age, credit_card_num, status) VALUES(%s, %s, %s, %s, %s, %s)""",
        (userid, name, job, age, credit_card_num, status)
    )

db.commit()
db.close()
timefinish = int(dt.now().strftime("%s"))
print("finish")
print("inserted {} records in {}s".format(records, timefinish-timestart))

The script used for the batch writes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import datetime
import random
import MySQLdb
from datetime import datetime as dt

host="172.18.0.1"
user="root"
password="password"
dbname="shopdb"
records=100000

db = MySQLdb.connect(host, user, password, dbname)

names = ['ruan', 'donovan', 'james', 'warren', 'angie', 'nicole', 'jenny', 'penny', 'amber']
job = ['doctor', 'scientist', 'teacher', 'police officer', 'waiter', 'banker', 'it']

cur = db.cursor()
cur.execute("DROP TABLE IF EXISTS customers")
cur.execute("CREATE TABLE customers(userid VARCHAR(50), name VARCHAR(50), surname VARCHAR(50), job VARCHAR(50), age INT(2), credit_card_num VARCHAR(50), status VARCHAR(10))")

bunch_users = []
userids = []

print("start")

def gen_id():
    return str(random.randint(0,9999)).zfill(4)

def gen_user(username):
    ccnum = '{0}-{1}-{2}-{3}'.format(gen_id(), gen_id(), gen_id(), gen_id())
    userid = username + '_' + ccnum.split('-')[0] + ccnum.split('-')[2]
    return {"uid": userid, "ccnum": ccnum}

for name in range(records):
    userids.append(gen_user(random.choice(names)))

for user in userids:
    userid = user["uid"]
    name = user["uid"].split('_')[0]
    job = random.choice(job)
    age = random.randint(24,49)
    credit_card_num = user["ccnum"]
    status = random.choice(["active", "inactive", "disabled"])

    bunch_users.append((userid, name, job, age, credit_card_num, status))

timestart = int(dt.now().strftime("%s"))

print("writing customers to database")
cur.executemany(
    """INSERT INTO customers(userid, name, job, age, credit_card_num, status) VALUES(%s, %s, %s, %s, %s, %s)""",
    bunch_users
)

db.commit()
db.close()
timefinish = int(dt.now().strftime("%s"))
print("finish")
print("inserted {} records in {}s".format(records, timefinish-timestart))

Thanks

Thanks for reading, so this was kind of interesting to see to never do sequential writes but write them in bulk when you have large amount of writes.

Increase Performance With Your Ghost Blog on Docker

nginx-blog-ghost-caching

Nginx Caching + Ghost == Great Performance.

In this post we will build a nginx reverse proxy with caching enabled for our static content such as images, which will be our frontend and therefore we will have port 80 exposed, and run our ghost blog as our backend, which we will proxy traffic through from our nginx container.

But why would you want caching?

Returning data from memory is a lot faster than returning data from disk, and in this case where a request is being made against nginx, then it proxy passes the request to ghost, gets the data that you requested and returns the data to the client.

So for items that rarely changes like images, we can benefit from caching, so the images can be returned from the nginx service, where the first request will be made to ghost and then it will be loaded into nginx cache, so then the next time when you request the same image it will be returned from cache instead of making that same request to ghost again.

Caching Info

For this demonstration once we define the size of our chache which will be 500MB and we specify that if an object has not been accessed for 24 hours, we can expire the object from the cache.

Nginx

We will build our nginx container by adding our custom nginx config to our dockerfile.

Our Dockerfile will look like the following:

1
2
ROM nginx:stable
ADD nginx.conf /etc/nginx/nginx.conf

Our nginx.conf configuration file:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
events {
  worker_connections  1024;
}

http {
  default_type       text/html;
  access_log         /dev/stdout;
  sendfile           on;
  keepalive_timeout  65;

  #proxy_cache_path /tmp/ghostcache levels=1:2 keys_zone=ghostcache:500m max_size=2g inactive=30d;
  proxy_cache_path /tmp/ghostcache levels=1:2 keys_zone=ghostcache:60m max_size=500m inactive=24h;
  proxy_cache_key "$scheme$request_method$host$request_uri";
  proxy_cache_methods GET HEAD;

  server {
    listen 80;

    location / {
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header Host $http_host;
        proxy_pass http://ghost:2368;
    }

    location ~* \.(?:css|js|ico)$ {
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header Host $http_host;
        proxy_pass http://ghost:2368;
        access_log off;
    }

    location ^~ /content/images/ {
        proxy_cache ghostcache;
        proxy_cache_valid 60m;
        proxy_cache_valid 404 1m;
        proxy_ignore_headers Set-Cookie;
        proxy_hide_header Set-Cookie;
        proxy_cache_use_stale error timeout invalid_header updating http_500 http_502 http_503 http_504;
        proxy_ignore_headers Cache-Control;
        add_header X-Cache-Status $upstream_cache_status;

        proxy_set_header Host $http_host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_pass http://ghost:2368;
        access_log off;
    }
  }
}

Then our docker-compose.yml where we will add our nginx and ghost container to run together:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
version: '3.4'

services:
  ghost:
    image: ghost:3.15.1
    container_name: 'ghost'
    environment:
      - NODE_ENV=production
      - url=http://localhost:80
    networks:
      - ghost
    volumes:
      - ghost_content:/var/lib/ghost/content/data

  proxy:
    build: .
    container_name: 'proxy'
    depends_on:
      - ghost
    ports:
      - 80:80
    networks:
      - ghost

networks:
  ghost: {}

volumes:
  ghost_content: {}

To boot our stack:

1
$ docker-compose up

Test Caching

Once your containers are in a running state, open your browsers devloper tools and look at the networking tab, then access your ghost blog on http://localhost:80/, the first time a image is opened you should see the cache shows MISS when you refresh again you should see a HIT, which means that the object is being returned from your cache.

Ingesting Pocket.com Links Into Elasticsearch

python-elasticsearch-pocket

Links that I stumble upon, I always save to getpocket.com and tag them with the relevant info. So the one day I had this random idea to list my links per category on a web service and I was wondering how to approach that scenario, which lead me to this.

In this post we will consume all our saved bookmarks from pocket.com and ingest them into elasticsearch. But we dont want to read all the items from pocket’s api every single time when the consumer run, therefore I have a method of checkpointing the last save run with a timestamp, so the next time it runs, we have context where to start from

What will we be doing

We will authenticate with pocket, then write the code how we will read the data from pocket and ingest them into elasticsearch.

Authentication

Head over to the developer console on pocket and create a new application then save your config in config.py which we will have as:

1
2
3
4
5
6
7
8
application_name = "Awesome Links"
application_link = "https://getpocket.com/developer/app/x/x"
application_url = "https://awesome-links.domain"
consumer_key = "x"
access_token = "x"
es_host = ""
es_user = ""
es_pass = ""

Ensure that you have the requests library installed (pip install requests), the code that I used to get a access token:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import config
import requests
import webbrowser
import time

CONSUMER_KEY = config.consumer_key
BASE_URL = "https://getpocket.com"
REDIRECT_URL = "localhost" # <-- you can run python -m SimpleHTTPServer 80 to have a local server listening on port 80
HEADERS = {"Content-Type": "application/json; charset=UTF-8", "X-Accept": "application/json"}

def request_code():
    payload = {
        "consumer_key": CONSUMER_KEY,
        "redirect_uri": REDIRECT_URL,
    }
    response = requests.post("https://getpocket.com/v3/oauth/request", headers=HEADERS, json=payload)
    print("request_code")
    print(response.json())
    return response.json()["code"]

def request_access_token(code):
    payload = {
        "consumer_key": CONSUMER_KEY,
        "code": code,
    }
    response = requests.post("https://getpocket.com/v3/oauth/authorize", headers=HEADERS, json=payload)
    print("request_access_token")
    print(response.json())
    time.sleep(10)
    return response.json()["access_token"]

def request_authorization(code):
    url = "https://getpocket.com/auth/authorize?request_token={code}&redirect_uri={redirect_url}".format(code=code, redirect_url=REDIRECT_URL)
    print("request_authorization")
    print(url)
    webbrowser.open(url, new=2)

def authenticate_pocket():
    code = request_code()
    request_authorization(code)
    return request_access_token(code)

authenticate_pocket()
# access_token will be returned

Main App

Once we have our access_token we can save that to our config.py, we will also be working with elasticsearch so we can add our elasticsearch info there as well:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/env python

import config
import requests
import time

CONSUMER_KEY = config.consumer_key
ACCESS_TOKEN = config.access_token
HEADERS = {"Content-Type": "application/json; charset=UTF-8", "X-Accept": "application/json"}
ES_HOST = config.es_host
ES_USER = config.es_user
ES_PASS = config.es_pass

def write_checkpoint(timestamp):
    response = requests.put(
        'https://{eshost}/pocket-data/_doc/checkpoint'.format(eshost=ES_HOST),
        auth=(ES_USER, ES_PASS),
        json={
            "checkpoint_timestamp": timestamp
        }
    )
    return {"checkpoint_timestamp": timestamp}

def get_checkpoint():
    response = requests.get(
        'https://{eshost}/pocket-data/_doc/checkpoint'.format(ES_HOST),
        auth=(ES_USER, ES_PASS)
    )
    checkpoint_timestamp = response.json()['_source']['checkpoint_timestamp']
    return checkpoint_timestamp

def ingest_to_es(payload):
    response = requests.put(
        'https://{eshost}/pocket-data/_doc/{item_id}'.format(eshost=ES_HOST, item_id=payload['item_id']),
        auth=(ES_USER, ES_PASS),
        json=payload
    )
    return response.json()

def convert_timestamp(epoch):
    return time.strftime('%Y-%m-%d', time.localtime(int(epoch)))

def mapper(pocket_item):
    try:
        payload = {
            "item_id": pocket_item['item_id'],
            "time_added": convert_timestamp(pocket_item['time_added']),
            "url": pocket_item['resolved_url'],
            "title": pocket_item['resolved_title'],
            #"excerpt": pocket_item['excerpt'],
            "tags": list(pocket_item['tags'].keys())
        }
    except:
        print("error, item has been skipped:")
        print(pocket_item)
        payload = "skip"
    return payload

def ingest_pocket_items(payload):
    pocket_items = list()
    pocket_items.extend(payload['list'].keys())
    last_scraped_time = payload['since']
    number_of_items = len(pocket_items)
    print('got {} items from pocket'.format(len(pocket_items)))
    time.sleep(5)
    if len(pocket_items) > 0:
        for pocket_item in pocket_items:
            mapped_payload = mapper(payload['list'][pocket_item])
            #print(mapped_payload)
            if mapped_payload != "skip":
                ingest_to_es(mapped_payload)
            print("Number of items left to ingest: {}".format(number_of_items))
            number_of_items-=1
    else:
        print('nothing new')
    print('writing checkpoint to es: {}'.format(last_scraped_time))
    write_checkpoint(last_scraped_time)
    return 'done'

def fetch_pocket_items(timestamp):
    response = requests.post(
        "https://getpocket.com/v3/get",
        headers=HEADERS,
        json={
            "consumer_key": CONSUMER_KEY,
            "access_token": ACCESS_TOKEN,
            "state": "all",
            "contentType": "article",
            "sort": "newest",
            "detailType": "complete",
            "since": int(timestamp)
        }
    )
    return response.json()

# get checkpoint
print('getting checkpoint id')
checkpoint_timestamp = get_checkpoint()
print('got checkpoint id: {}'.format(checkpoint_timestamp))
time.sleep(5)

# fetch items from pocket
print('fetch items from pocket')
pocket_response = fetch_pocket_items(checkpoint_timestamp)

# write
print('ingesting pocket items into es')
ingest_pocket_items(pocket_response)

So what we are doing here is that we are reading from the pocket api all the data that you saved in your account, and save the current time in epoch format, which we will need to tell our run when was the last time we consumed and keep that value in memory.

Then from the data we received, we will map the data that we are interested in, into key/value pairs and then ingest the data into elasticsearch.

After the initial ingestion has been done, which can take some time depending on how many items you have on pocket, as soon as it’s done it will write the checkpoint time to elasticsearch so that the client know the next time from what time to search from again.

This way we dont ingest all the items again, testing it:

1
2
3
4
5
6
7
8
9
$ python server.py
getting checkpoint id
got checkpoint id: 1591045652
fetch items from pocket
ingesting pocket items into es
got 2 items from pocket
Number of items left to ingest: 2
Number of items left to ingest: 1
writing checkpoint to es: 1591392580

Add one more item to pocket, then run our ingester again:

1
2
3
4
5
6
7
8
$ python server.py
getting checkpoint id
got checkpoint id: 1591392580
fetch items from pocket
ingesting pocket items into es
got 1 items from pocket
Number of items left to ingest: 1
writing checkpoint to es: 1591650259

Search for one document on elasticsearch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
$ curl -u user:pass 'https://es.domain/pocket-data/_search?pretty=true&size=1'
{
  "took" : 194,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 766,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "pocket-data",
        "_type" : "_doc",
        "_id" : "2676106577",
        "_score" : 1.0,
        "_source" : {
          "item_id" : "2676106577",
          "time_added" : "2020-05-03",
          "url" : "https://programmaticponderings.com/2019/07/30/managing-aws-infrastructure-as-code-using-ansible-cloudformation-and-codebuild/",
          "title" : "Managing AWS Infrastructure as Code using Ansible, CloudFormation, and CodeBuild",
          "tags" : [
            "ansible",
            "aws",
            "cicd",
            "cloudformation",
            "devops"
          ]
        }
      }
    ]
  }
}

Search for aws tags:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
$ curl -u x:x 'https://es.domain/pocket-data/_search?q=tags:aws&pretty=true&size=1'
{
  "took" : 101,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 112,
    "max_score" : 2.6346242,
    "hits" : [
      {
        "_index" : "pocket-data",
        "_type" : "_doc",
        "_id" : "2673747670",
        "_score" : 2.6346242,
        "_source" : {
          "item_id" : "2673747670",
          "time_added" : "2019-07-28",
          "url" : "https://github.com/lgoodridge/serverless-chat",
          "title" : "lgoodridge/serverless-chat",
          "tags" : [
            "aws"
          ]
        }
      }
    ]
  }
}

Now what

Now that our data is in elasticsearch, we can build a search engine or a web application that can list our favorite links per category. I wil write up a post on the search engine in the future.

Thank You

If you liked this please send me a shout out on Twitter: @ruanbekker

Using Python RQ for Task Queues in Python

This is a getting started on python-rq tutorial and I will demonstrate how to work with asynchronous tasks using python redis queue (python-rq).

What will we be doing

We want a client to submit 1000’s of jobs in a non-blocking asynchronous fashion, and then we will have workers which will consume these jobs from our redis queue, and process those tasks at the rate of what our consumer can handle.

The nice thing about this is that, if our consumer is unavailable for processing the tasks will remain in the queue and once the consumer is ready to consume, the tasks will be executed. It’s also nice that its asynchronous, so the client don’t have to wait until the task has finished.

We will run a redis server using docker, which will be used to queue all our jobs, then we will go through the basics in python and python-rq such as:

  • Writing a Task
  • Enqueueing a Job
  • Getting information from our queue, listing jobs, job statuses
  • Running our workers to consume from the queue and action our tasks
  • Basic application which queues jobs to the queue, consumes and action them and monitors the queue

Redis Server

You will require docker for this next step, to start the redis server:

1
$ docker run --rm -itd --name redis -p 6379:6379 redis:alpine

Python RQ

Install python-rq:

1
$ pip install rq

Create the task which will be actioned by our workers, in our case it will just be a simple function that adds all the numbers from a given string to a list, then adds them up and return the total value.

This is however a very basic task, but its just for demonstration.

Our tasks.py:

1
2
3
4
5
6
7
8
9
10
def sum_numbers_from_string(string):
    numbers = []
    for each_character in string:
        if each_character.isdigit():
            numbers.append(int(each_character))
    total = 0
    for each_number in numbers:
        total=total+each_number

    return total

To test this locally:

1
2
3
>>> from tasks import sum_numbers_from_string
>>> sum_numbers_from_string('adje-fje5-sjfdu1s-gdj9-asd1fg')
16

Now, lets import redis and redis-queue, with our tasks and instantiate a queue object:

1
2
3
4
5
>>> from redis import Redis
>>> from rq import Connection, Queue, Worker
>>> from tasks import sum_numbers_from_string
>>> redis_connection = Redis(host='localhost', port=6379, db=0)
>>> q = Queue(connection=redis_connection)

Submit a Task to the Queue

Let’s submit a task to the queue:

1
>>> result = q.enqueue(sum_numbers_from_string, 'hbj2-plg5-2xf4r1s-f2lf-9sx4ff')

We have a couple of properties from result which we can inspect, first let’s have a look at the id that we got back when we submitted our task to the queue:

1
2
>>> result.get_id()
'5a607474-cf1b-4fa5-9adb-f8437555a7e7'

We can also get the status from our task:

1
2
>>> result.get_status()
'queued'

We can also view our results in json format:

1
2
3
4
5
6
7
8
9
10
11
12
13
>>> import json
>>> print(json.dumps(result.to_dict(), indent=2, default=str))
{
  "created_at": "2020-05-16T11:56:49.892713Z",
  "data": "b'..\\x00\\x99\\xa0\\x16\\xfe..'",
  "origin": "default",
  "description": "tasks.sum_numbers_from_string('hbj2-plg5-2xf4r1s-f2lf-9sx4ff')",
  "enqueued_at": "2020-05-16T11:56:49.893252Z",
  "started_at": "",
  "ended_at": "",
  "timeout": 180,
  "status": "queued"
}

If we dont have context of the job id, we can use get_jobs to get all the jobs which is queued:

1
2
3
>>> list_jobs = q.get_jobs
>>> list_jobs()
[Job('5a607474-cf1b-4fa5-9adb-f8437555a7e7', enqueued_at=datetime.datetime(2020, 5, 16, 12, 30, 22, 699609))]

Then we can loop through the results and get the id like below:

1
2
3
4
>>> for j in list_jobs():
...     j.id
...
'5a607474-cf1b-4fa5-9adb-f8437555a7e7'

Or to get the job id’s in a list:

1
2
3
>>> list_job_ids = q.get_job_ids()
>>> list_job_ids
['5a607474-cf1b-4fa5-9adb-f8437555a7e7']

Since we received the job id, we can use fetch_job to get more info about the job:

1
2
3
>>> fetched_job = q.fetch_job('5a607474-cf1b-4fa5-9adb-f8437555a7e7')
>>> fetched_job
Job('5a607474-cf1b-4fa5-9adb-f8437555a7e7', enqueued_at=datetime.datetime(2020, 5, 16, 12, 30, 22, 699609))

And as before we can view it in json format:

1
2
>>> fetched_job.to_dict()
{'created_at': '2020-05-16T12:30:22.698728Z', 'data': b'..x\x9c6\xfe..', 'origin': 'queue1', 'description': "tasks.sum_numbers_from_string('hbj2-plg5-2xf4r1s-f2lf-9sx4ff')", 'enqueued_at': '2020-05-16T12:30:22.699609Z', 'started_at': '', 'ended_at': '', 'timeout': 180, 'status': 'queued'}

We can also view the key in redis by passing the job_id:

1
2
>>> result.key_for(job_id='5a607474-cf1b-4fa5-9adb-f8437555a7e7')
b'rq:job:5a607474-cf1b-4fa5-9adb-f8437555a7e7'

To view how many jobs are in our queue, we can either do:

1
2
>>> len(q)
1

or:

1
2
>>> q.get_job_ids()
['5a607474-cf1b-4fa5-9adb-f8437555a7e7']

Consuming from the Queue

Now that our task is queued, let’s fire of our worker to consume the job from the queue and action the task:

1
2
3
4
5
6
7
8
9
10
>>> w = Worker([q], connection=redis_connection)
>>> w.work()
14:05:35 Worker rq:worker:49658973741d4085961e34e9641227dd: started, version 1.4.1
14:05:35 Listening on default...
14:05:35 Cleaning registries for queue: default
14:05:35 default: tasks.sum_numbers_from_string('hbj2-plg5-2xf4r1s-f2lf-9sx4ff') (5a607474-cf1b-4fa5-9adb-f8437555a7e7)
14:05:40 default: Job OK (5a607474-cf1b-4fa5-9adb-f8437555a7e7)
14:05:40 Result is kept for 500 seconds
14:05:59 Warm shut down requested
True

Now, when we get the status of our job, you will see that it finished:

1
2
>>> result.get_status()
'finished'

And to get the result from our worker:

1
2
>>> result.result
29

And like before, if you dont have context of your job id, you can get the job id, then return the result:

1
2
3
>>> result = fetched_job = q.fetch_job('5a607474-cf1b-4fa5-9adb-f8437555a7e7')
>>> result.result
29

Naming Queues

We can namespace our tasks into specific queues, for example if we want to create queue1:

1
>>> q1 = Queue('queue1', connection=redis_connection)

To verify the queue name:

1
2
>>> q1
Queue('queue1')

As we can see our queue is empty:

1
2
>>> q1.get_job_ids()
[]

Let’s submit 10 jobs to our queue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>>> from uuid import uuid4
>>> for attempt in range(0,10):
...     random_string = uuid4().hex
...     q1.enqueue(sum_numbers_from_string, random_string)
...
Job('c3f2369d-5b27-40e0-97be-8fe26989a78e', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 472508))
Job('06b93517-5dae-4133-8131-e8d35b8dd780', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 475604))
Job('81f05aef-4bd6-421b-912d-78b5d419b10a', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 478071))
Job('8f14e81f-74fa-44d9-9fc7-e8e7b8c7b76f', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 480438))
Job('e8552750-89d2-4538-8c3e-a48c4c3e9a51', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 483106))
Job('bf19a0a3-eb0c-4692-b452-67c5ad954094', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 486193))
Job('0da3688a-cffa-4ba6-a272-b6cc90942ef6', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 488545))
Job('717bd147-615c-458d-8386-9ea6a198e137', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 491074))
Job('7cdac5aa-8dc3-40be-a8fc-b273ce61b03b', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 493618))
Job('4f7ea527-0695-4e2b-bc8b-3d8807a86390', enqueued_at=datetime.datetime(2020, 5, 16, 13, 1, 14, 496930))

To verify the number of jobs in our queue:

1
2
>>> q1.get_job_ids()
['c3f2369d-5b27-40e0-97be-8fe26989a78e', '06b93517-5dae-4133-8131-e8d35b8dd780', '81f05aef-4bd6-421b-912d-78b5d419b10a', '8f14e81f-74fa-44d9-9fc7-e8e7b8c7b76f', 'e8552750-89d2-4538-8c3e-a48c4c3e9a51', 'bf19a0a3-eb0c-4692-b452-67c5ad954094', '0da3688a-cffa-4ba6-a272-b6cc90942ef6', '717bd147-615c-458d-8386-9ea6a198e137', '7cdac5aa-8dc3-40be-a8fc-b273ce61b03b', '4f7ea527-0695-4e2b-bc8b-3d8807a86390']

And to count them:

1
2
>>> len(q1)
10

Cleaning the Queue

Cleaning the queue can either be done with:

1
2
>>> q.empty()
10

or

1
>>> q.delete(delete_jobs=True)

Then to verify that our queue is clean:

1
2
>>> q.get_job_ids()
[]

Naming Workers

The same way that we defined a name for our queue, we can define a name for our workers:

1
2
>>> worker = Worker([q1], connection=redis_connection, name='worker1')
>>> worker.work()

Which means you can have different workers consuming jobs from specific queues.

Resources

Documentation:

Thank You

I hope this was usful, if you enjoyed this come say hi on Twitter @ruanbekker or visit my website at ruan.dev

Selecting and Returning Specific Data With JQ

I was working with curl to get data from a api, and wanted to get a specific url for a specific name within an array. I got it working using jq, and will be demonstrating how I got it working.

The data:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
$ cat data.json | jq .
{
  "tag_name": "v1.17.5+k3s1",
  "name": "v1.17.5+k3s1",
  "assets": [
    {
      "url": "https://api.github.com/repos/rancher/k3s/releases/assets/20496869",
      "id": 20496869,
      "name": "e2e-passed-amd64-parallel.log",
      "state": "uploaded",
      "size": 1125136,
      "download_count": 3,
      "created_at": "2020-05-07T00:00:45Z",
      "updated_at": "2020-05-07T00:00:46Z",
      "browser_download_url": "https://github.com/rancher/k3s/releases/download/v1.17.5%2Bk3s1/e2e-passed-amd64-parallel.log"
    },
    {
      "url": "https://api.github.com/repos/rancher/k3s/releases/assets/20496281",
      "id": 20496281,
      "name": "k3s",
      "state": "uploaded",
      "size": 52740096,
      "download_count": 887,
      "created_at": "2020-05-06T23:45:02Z",
      "updated_at": "2020-05-06T23:45:03Z",
      "browser_download_url": "https://github.com/rancher/k3s/releases/download/v1.17.5%2Bk3s1/k3s"
    },
    {
      "url": "https://api.github.com/repos/rancher/k3s/releases/assets/20496655",
      "id": 20496655,
      "name": "k3s-armhf",
      "state": "uploaded",
      "size": 48431104,
      "download_count": 95,
      "created_at": "2020-05-06T23:48:05Z",
      "updated_at": "2020-05-06T23:48:06Z",
      "browser_download_url": "https://github.com/rancher/k3s/releases/download/v1.17.5%2Bk3s1/k3s-armhf"
    }
  ],
  "tarball_url": "",
  "zipball_url": "",
  "body": ""
}

Getting the json objects inside the array “assets”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
$ cat data.json | jq '.assets[]'
{
  "url": "https://api.github.com/repos/rancher/k3s/releases/assets/20496869",
  "id": 20496869,
  "name": "e2e-passed-amd64-parallel.log",
  "state": "uploaded",
  "size": 1125136,
  "download_count": 3,
  "created_at": "2020-05-07T00:00:45Z",
  "updated_at": "2020-05-07T00:00:46Z",
  "browser_download_url": "https://github.com/rancher/k3s/releases/download/v1.17.5%2Bk3s1/e2e-passed-amd64-parallel.log"
}
{
  "url": "https://api.github.com/repos/rancher/k3s/releases/assets/20496281",
  "id": 20496281,
  "name": "k3s",
  "state": "uploaded",
  "size": 52740096,
  "download_count": 887,
  "created_at": "2020-05-06T23:45:02Z",
  "updated_at": "2020-05-06T23:45:03Z",
  "browser_download_url": "https://github.com/rancher/k3s/releases/download/v1.17.5%2Bk3s1/k3s"
}
{
  "url": "https://api.github.com/repos/rancher/k3s/releases/assets/20496655",
  "id": 20496655,
  "name": "k3s-armhf",
  "state": "uploaded",
  "size": 48431104,
  "download_count": 95,
  "created_at": "2020-05-06T23:48:05Z",
  "updated_at": "2020-05-06T23:48:06Z",
  "browser_download_url": "https://github.com/rancher/k3s/releases/download/v1.17.5%2Bk3s1/k3s-armhf"
}

Data containing “k3s” under the “name” key::

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ cat data.json | jq '.assets[] | select(.name | contains("3s"))'
{
  "url": "https://api.github.com/repos/rancher/k3s/releases/assets/20496281",
  "id": 20496281,
  "name": "k3s",
  "state": "uploaded",
  "size": 52740096,
  "download_count": 887,
  "created_at": "2020-05-06T23:45:02Z",
  "updated_at": "2020-05-06T23:45:03Z",
  "browser_download_url": "https://github.com/rancher/k3s/releases/download/v1.17.5%2Bk3s1/k3s"
}
{
  "url": "https://api.github.com/repos/rancher/k3s/releases/assets/20496655",
  "id": 20496655,
  "name": "k3s-armhf",
  "state": "uploaded",
  "size": 48431104,
  "download_count": 95,
  "created_at": "2020-05-06T23:48:05Z",
  "updated_at": "2020-05-06T23:48:06Z",
  "browser_download_url": "https://github.com/rancher/k3s/releases/download/v1.17.5%2Bk3s1/k3s-armhf"
}

Data that starts with “k3s” under the “name” key:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ cat data.json | jq '.assets[] | select(.name | startswith("k3s"))'
{
  "url": "https://api.github.com/repos/rancher/k3s/releases/assets/20496281",
  "id": 20496281,
  "name": "k3s",
  "state": "uploaded",
  "size": 52740096,
  "download_count": 887,
  "created_at": "2020-05-06T23:45:02Z",
  "updated_at": "2020-05-06T23:45:03Z",
  "browser_download_url": "https://github.com/rancher/k3s/releases/download/v1.17.5%2Bk3s1/k3s"
}
{
  "url": "https://api.github.com/repos/rancher/k3s/releases/assets/20496655",
  "id": 20496655,
  "name": "k3s-armhf",
  "state": "uploaded",
  "size": 48431104,
  "download_count": 95,
  "created_at": "2020-05-06T23:48:05Z",
  "updated_at": "2020-05-06T23:48:06Z",
  "browser_download_url": "https://github.com/rancher/k3s/releases/download/v1.17.5%2Bk3s1/k3s-armhf"
}

Data with the exact name is “k3s”:

1
2
3
4
5
6
7
8
9
10
11
12
$ cat data.json | jq '.assets[] | select(.name == "k3s")'
{
  "url": "https://api.github.com/repos/rancher/k3s/releases/assets/20496281",
  "id": 20496281,
  "name": "k3s",
  "state": "uploaded",
  "size": 52740096,
  "download_count": 887,
  "created_at": "2020-05-06T23:45:02Z",
  "updated_at": "2020-05-06T23:45:03Z",
  "browser_download_url": "https://github.com/rancher/k3s/releases/download/v1.17.5%2Bk3s1/k3s"
}

Getting only the “browser_download_url” value from the match:

1
2
$ cat data.json | jq '.assets[] | select(.name == "k3s") | .browser_download_url'
"https://github.com/rancher/k3s/releases/download/v1.17.5%2Bk3s1/k3s"

Server Backups to Google Drive Using the Drive CLI Tool

This tutorial will demonstrate how I ship my backups to Google Drive using the drive cli utility.

What I really like about the drive cli tool, is that it’s super easy to setup and you can easily script your backups to ship it to google drive.

What we will be doing

We will setup the drive cli tool, authorize it with your google account, then show how to upload your files to google drive from your terminal and then create a script to automatically upload your data to google drive and then include it in a cronjob.

Setup Drive CLI Tool

Head over to the drive releases page and get the latest version, at the moment of writing 0.3.9 is the latest. Then we will move it to our path and make it executable:

1
2
3
$ wget https://github.com/odeke-em/drive/releases/download/v0.3.9/drive_linux
$ mv drive_linux /usr/bin/gdrive
$ chmod +x /usr/bin/gdrive

You should be getting a output when running version as an argument:

1
2
$ gdrive version
drive version: 0.3.9

Credentials

Move to your home directory and initialize, this will ask you to access the google accounts web page, where you will be authorizing this application to use your google drive account. Upon succesful authorization, you will get a authorization code that we will need to paste in our terminal.

This will then write the credentials file to ~/.gd/credentials.json`. Always remember to keep this file safe.

1
2
3
4
$ gdrive init
Visit this URL to get an authorization code
https://accounts.google.com/o/oauth2/auth?access_type=offline&client_id=x&redirect_uri=x&response_type=code&scope=x&state=x
Paste the authorization code: < paste authorization code here >

You will now see that the credentials for your application has been saved as seen below:

1
2
$ cat ~/.gd/credentials.json
{"client_id":"<redacted>","client_secret":"<redacted>","refresh_token":"<redacted>"}

Backup to Google Drive

On Google Drive, I have a backup folder named Backups and in my local path /opt/backups/, which has the files that I want to backup to google drive:

1
2
3
$ ls /opt/backups/
app.backup-2020-05-05.tar.gz  
app.backup-2020-05-06.tar.gz

Now let’s backup the files to Google Drive, it works as follows gdrive push -destination (path on google drive) (path on local drive):

1
2
3
4
5
6
$ gdrive push -destination Backups/demo/app1 /opt/backups/*
Resolving...
+ /Backups/demo/app1/app.backup-2020-05-05.tar.gz
+ /Backups/demo/app1/app.backup-2020-05-06.tar.gz
Addition count 2 src: 26.32MB
Proceed with the changes? [Y/n]:Y

As you can see it checks what is on Google Drive and what is on the Local Drive, then determines what needs to be uploaded, and asks you if you want to continue.

If we run that command again, you will see that it does not upload it again, as the content is already on Google Drive:

1
2
3
$ gdrive push -destination Backups/demo/app1 /opt/backups/*
Resolving...
Everything is up-to-date.

To test it out, let’s create a new file and verify if it only uploads the new file:

1
2
3
4
5
6
$ touch file.txt
$ gdrive push -destination Backups/demo/app1 /opt/backups/*
Resolving...
+ /Backups/demo/app1/file.txt
Addition count 1
Proceed with the changes? [Y/n]:y

That is all cool and all, but if we want to script this, we don’t want to be prompted to continue, we can do this by adding a argument -quiet:

1
$ gdrive push -quiet -destination Backups/demo/app1 /opt/*

Scripting our Backup Task

Let’s create a script that makes a local archive, then uploads it to Google Drive, I will create the file: /opt/scripts/backup.sh with the following content:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/bin/bash

# make a local archive
tar -zcvf /opt/backups/app1.backup-$(date +%F).tar.gz \
  /home/me/data/dir1 \
  /home/me/data/dir2 \
  /home/me/data/dir3 \
  /home/me/data/dir4 

# backup to gdrive
sleep 1
gdrive push -quiet -destination Backups/Servers/sysadmins.co.za /opt/backups/sysadmins-blog/*

# delete archives older than 14 days from disk
sleep 1
find /opt/backups/ -type f -name "*.tar.gz" -mtime +14 -exec rm {} \;

Make the file executable:

1
$ chmod +x /opt/scripts/backup.sh

Then, we want to add it as a cronjob so that it runs every night at 23:10 in my case:

Open crotab: crontab -e and add the following entry:

1
10 23 * * * /opt/scripts/backup.sh

Thank You

Backups are important, especially when you rely on them, and it was never made. Plan ahead to not be in that situation.

How to Setup a Redis Exporter for Prometheus

In this tutorial we will visualize our Redis Cluster’s Metrics with Grafana. In order to do that we will setup a redis exporter which will authenticate with redis and then configure prometheus to scrape the endpoint of the redis exporter’s http endpoint to write the time series data to prometheus.

Install Golang

We need to build a binary from the redis exporter project, and we need a Golang environment. If you don’t have golang installed already:

1
2
3
4
5
6
$ cd /tmp/
$ wget https://dl.google.com/go/go1.14.2.linux-amd64.tar.gz
$ tar -xf go1.14.2.linux-amd64.tar.gz -C /usr/local
$ mkdir -p $HOME/go/{bin,src,pkg}
$ export GOPATH=/go
$ export PATH=${PATH}:${GOPATH}/bin:/usr/local/go/bin

You should now be able to get a response:

1
2
$ go version
go version go1.14.2 linux/amd64

Redis Exporter

Get the source code and build the binary:

1
2
3
$ git clone https://github.com/oliver006/redis_exporter.git
$ cd redis_exporter
$ go build .

Now the binary should be built, and you should be able to get a response when running the following:

1
$ ./redis_exporter --help

Copy the binary the the following path:

1
$ cp redis_exporter /usr/bin/

Then create the systemd unit file, in /etc/systemd/system/redis_exporter.service:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[Unit]
Description=Redis Exporter
Wants=network-online.target
After=network-online.target

[Service]
User=root
Group=root
Type=simple
ExecStart=/usr/bin/redis_exporter \
    -web.listen-address ":9121" \
    -redis.addr "redis://ip.of.redis.server:6379" \
    -redis.password "your-strong-redis-password"

[Install]
WantedBy=multi-user.target

Reload systemd:

1
$ systemctl daemon-relaod

Then start the redis exporter:

1
$ systemctl restart redis_exporter

Now you should be able to get redis metrics when you hit the redis exporter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ curl http://127.0.0.1:9121/metrics
...
# TYPE redis_commands_duration_seconds_total counter
redis_commands_duration_seconds_total{cmd="auth"} 0.000308
redis_commands_duration_seconds_total{cmd="client"} 0.000251
redis_commands_duration_seconds_total{cmd="config"} 0.010594
redis_commands_duration_seconds_total{cmd="evalsha"} 229.214873
redis_commands_duration_seconds_total{cmd="get"} 0.002343
redis_commands_duration_seconds_total{cmd="info"} 0.013722
redis_commands_duration_seconds_total{cmd="latency"} 0.000557
redis_commands_duration_seconds_total{cmd="lrange"} 11.102069
redis_commands_duration_seconds_total{cmd="ltrim"} 3.731263
redis_commands_duration_seconds_total{cmd="ping"} 2e-05
redis_commands_duration_seconds_total{cmd="rpush"} 3.460981
redis_commands_duration_seconds_total{cmd="script"} 0.008393
redis_commands_duration_seconds_total{cmd="set"} 0.001329
redis_commands_duration_seconds_total{cmd="slowlog"} 0.001308
...

Configure Prometheus

If you don’t have prometheus setup, you can view this blogpost to get it setup.

Then configure your prometheus.yml and add the target to scrape the redis exporter endpoint to write the time series data into prometheus:

1
2
3
4
scrape_configs:
  - job_name: redis_exporter
    static_configs:
    - targets: ['ip.of.redis.exporter:9121']

Then restart prometheus, if you have docker redeploy your stack or prometheus container. For prometheus as a service you can use systemctl restart prometheus, depending on your operating system distribution.

Grafana

Head over to Grafana, if you don’t have Grafana, you can view this post to install Grafana.

Then import the dashboard 763 and after some time, you should see a dashboard more or less like this:

image

Nginx Analysis Dashboard Using Grafana and Elasticsearch

In this post we will be setting up a analytical dashboard using grafana to visualize our nginx access logs.

grafana-nginx-elasticsearch-prometheus

In this tutorial I will be using my other blog sysadmins.co.za which is being served on nginx. We will also be setting up the other components such as filebeat, logstash, elasticsearch and redis, which require if you would like to follow along.

The End Result

We will be able to analyze our Nginx Access logs to answer questions such as:

  • Whats the Top 10 Countries accessing your website in the last 24 hours
  • Who’s the Top 10 Referers?
  • Whats the most popular page for the past 24 hours?
  • How does the percentage of 200’s vs 404’s look like?
  • Ability to view results based on status code
  • Everyone loves a World Map to view hotspots

At the end of the tutorial, your dashboard will look similar to this:

grafana-elasticsearch-nginx-dashboard

High Level Overview

Our infrastructure will require Nginx with Filebeat, Redis, Logstash, Elasticsearch and Grafana and will look like this:

grafana-elasticsearch-logs-setup

I will drill down how everything is connected:

  1. Nginx has a custom log_format that we define, that will write to /var/log/nginx/access_json.log, which will be picked up by Filebeat as a input.
  2. and Filebeat has an output that pushes the data to Redis
  3. Logstash is configured with Redis as an input with configured filter section to transform the data and outputs to Elasticsearch
  4. From Grafana we have a configured Elasticsearch datasource
  5. Use the grafana template to build this awesome dashboard on Grafana

But first, a massive thank you to akiraka for templatizing this dashboard and made it available on grafana

Let’s build all the things

I will be using LXD to run my system/server containers (running ubuntu 18), but you can use a vps, cloud instance, multipass, virtualbox, or anything to host your servers that we will be deploying redis, logstash, etc.

Servers provisioned for this setup:

  • Nginx
  • Redis
  • Logstash
  • Elasticsearch
  • Grafana
  • Prometheus

Elasticsearch

If you don’t have a cluster running already, you can follow this tutorial which will help you deploy a HA Elasticsearch Cluster, or if you prefer docker, you can follow this tutorial

Redis

For our in-memory data store, I will be securing my redis installation with a password as well.

Install redis:

1
$ apt update && apt install redis-server -y

Generate a password:

1
2
$ openssl rand -base64 36
9V5YlWvm8WuC4n1KZLYUEbLruLJLNJEnDzhu4WnAIfgxMmlv

In your redis config /etc/redis/redis.conf, you need to change the following:

1
2
3
4
5
6
7
...
bind 0.0.0.0
port 6379
daemonize yes
supervised systemd
requirepass 9V5YlWvm8WuC4n1KZLYUEbLruLJLNJEnDzhu4WnAIfgxMmlv
...

Restart redis to activate your changes:

1
$ systemctl restart redis.service

and then set and get a key using your password:

1
2
3
$ redis-cli -a "9V5YlWvm8WuC4n1KZLYUEbLruLJLNJEnDzhu4WnAIfgxMmlv" set test ok
$ redis-cli -a "9V5YlWvm8WuC4n1KZLYUEbLruLJLNJEnDzhu4WnAIfgxMmlv" get test
ok

Logstash

On the logstash server, install the requirements:

1
2
3
$ apt update && apt install wget apt-transport-https default-jre -y
$ wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | apt-key add -
$ echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | tee -a /etc/apt/sources.list.d/elastic-7.x.list

Now the repository for elastic is setup now we need to update and install logstash:

1
$ apt update && apt install logstash -y

Once logstash is installed, we need to provide logstash with a configuration, in our scenario we will have a input for redis, a filter section to transform and output as elasticsearch.

Just make sure of the following:

  • Populate the connection details of redis (we will define the key in filebeat later)
  • Ensure that GeoLite2-City.mmdb is in the path that I have under filter
  • Populate the connectiond details of Elasticsearch and choose a suitable index name, we will need to provide that index name in Grafana later

Create the config: /etc/logstash/conf.d/logs.conf and my config will look like the following. (config source)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
input {
  redis {
    data_type =>"list"
    key =>"nginx_logs"
    host =>"10.47.127.37"
    port => 6379
    password => "9V5YlWvm8WuC4n1KZLYUEbLruLJLNJEnDzhu4WnAIfgxMmlv"
    db => 0
  }
}

filter {
  geoip {
    target => "geoip"
    source => "client_ip"
    database => "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.3-java/vendor/GeoLite2-City.mmdb"
    add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
    add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
    remove_field => ["[geoip][latitude]", "[geoip][longitude]", "[geoip][country_code]", "[geoip][country_code2]", "[geoip][country_code3]", "[geoip][timezone]", "[geoip][continent_code]", "[geoip][region_code]"]
  }
  mutate {
    convert => [ "size", "integer" ]
    convert => [ "status", "integer" ]
    convert => [ "responsetime", "float" ]
    convert => [ "upstreamtime", "float" ]
    convert => [ "[geoip][coordinates]", "float" ]
    remove_field => [ "ecs","agent","host","cloud","@version","input","logs_type" ]
  }
  useragent {
    source => "http_user_agent"
    target => "ua"
    remove_field => [ "[ua][minor]","[ua][major]","[ua][build]","[ua][patch]","[ua][os_minor]","[ua][os_major]" ]
  }
}
output {
  elasticsearch {
    hosts => ["10.47.127.132", "10.47.127.199", "10.47.127.130"]
    #user => "myusername"
    #password => "mypassword"
    index => "logstash-nginx-sysadmins-%{+YYYY.MM.dd}"
  }
}

Nginx

On our nginx server we will install nginx and filebeat, then configure nginx to log to a custom log format, and configure filebeat to read the logs and push it to redis.

Installing nginx:

1
$ apt update && apt install nginx -y

Installing filebeat:

1
2
$ curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.6.2-amd64.deb
$ dpkg -i filebeat-7.6.2-amd64.deb

Next we will configure nginx to log to a seperate file with a custom log format to include data such as the, request method, upstream response time, hostname, remote address, etc.

Under the http directive in your /etc/nginx/nginx.conf, configure the log_format and access_log:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
http {
...
        log_format json_logs '{"@timestamp":"$time_iso8601","host":"$hostname",'
                            '"server_ip":"$server_addr","client_ip":"$remote_addr",'
                            '"xff":"$http_x_forwarded_for","domain":"$host",'
                            '"url":"$uri","referer":"$http_referer",'
                            '"args":"$args","upstreamtime":"$upstream_response_time",'
                            '"responsetime":"$request_time","request_method":"$request_method",'
                            '"status":"$status","size":"$body_bytes_sent",'
                            '"request_body":"$request_body","request_length":"$request_length",'
                            '"protocol":"$server_protocol","upstreamhost":"$upstream_addr",'
                            '"file_dir":"$request_filename","http_user_agent":"$http_user_agent"'
                            '}';

        access_log  /var/log/nginx/access_json.log  json_logs;
...
}

Restart nginx to activate the changes:

1
$ systemctl restart nginx

Next we need to configure filebeat to read from our nginx access logs and configure the output to redis. Edit the filebeat config:

1
$ vim /etc/filebeat/filebeat.yml

And configure filebeat with the following and make sure to change the values where you need to:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# config source: akiraka.net
# filebeat input 
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access_json.log
  json.keys_under_root: true
  json.overwrite_keys: true
  json.add_error_key: true

# filebeat modules 
filebeat.config.modules:
  # remove the escape character before the wildcard below
  path: ${path.config}/modules.d/\*.yml
  reload.enabled: false

# elasticsearch template settings
setup.template.settings:
  index.number_of_shards: 3

# redis output
output.redis:
  hosts: ["10.47.127.140:6379"]
  password: "9V5YlWvm8WuC4n1KZLYUEbLruLJLNJEnDzhu4WnAIfgxMmlv"
  key: "nginx_logs"
  # ^ this key needs to be the same as the configured key on logstash 
  db: 0
  timeout: 5

Restart filebeat:

1
$ systemctl restart filebeat

When you make a request to your nginx server, you should see a similar logline like below:

1
2
$ tail -n1 /var/log/nginx/access_elg.log
{"@timestamp":"2020-04-28T20:05:03+00:00","host":"sysadmins-blog","server_ip":"10.68.100.89","client_ip":"x.x.x.x","xff":"x.x.x.x","domain":"sysadmins.co.za","url":"/","referer":"-","args":"-","upstreamtime":"0.310","responsetime":"0.312","request_method":"GET","status":"200","size":"4453","request_body":"-","request_length":"519","protocol":"HTTP/1.1","upstreamhost":"127.0.0.1:2369","file_dir":"/var/www/web/root/","http_user_agent":"Mozilla/5.0"}

Grafana

On the grafana server, install grafana:

1
2
3
4
$ apt update && apt install apt-transport-https software-properties-common wget -y
$ wget -q -O - https://packages.grafana.com/gpg.key | apt-key add -
$ add-apt-repository "deb https://packages.grafana.com/oss/deb stable main"
$ apt update && apt install grafana -y

Now we need to install a couple of grafana plugins that we require for our dashboards:

1
2
3
$ grafana-cli plugins install grafana-worldmap-panel
$ grafana-cli plugins install grafana-clock-panel
$ grafana-cli plugins install grafana-piechart-panel

Now reload systemd and restart grafana:

1
2
$ systemctl daemon-reload
$ systemctl restart grafana-server

If you would like to setup nginx as a reverse proxy to grafana, you can have a look at this blogpost on how to do that.

Prometheus

If you don’t have Prometheus installed already, you can view my blogpost on setting up Prometheus.

Verifying

To verify if everything works as expected, make a request to your nginx server, then have a look if your index count on elasticsearch increases:

1
2
3
$ curl http://elasticsearch-endpoint-address:9200/_cat/indices/logstash-*?v
health status index                               uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   logstash-nginx-x-2020.04.28 SWbHCer-TeOcw6bi_695Xw   5   1      58279            0     32.6mb         16.3mb

If you dont, make sure that all the processes are running on the servers, and that each server is able to reach each other on the targeted ports.

The Fun Part: Dashboarding

Now that we have everything in place, the fun part is to build the dashboards, first we need to configure elasticsearch as our datasource and specify the index we want to read from. Open grafana on http://ip.of.grafana.server:3000, default user and password is admin.

Select config on the left and select datasources, add a datasource, select elasticsearch and specify your datasource name, mine is es-nginx in this example, the url of your elasticsearch endpoint, if you have secured your elasticsearch cluster with authentication, provide the auth, then provide your index name as as provided in logstash.

My configured index will look like logstash-nginx-sysadmins-YYYY-MM-dd, therefore I specified index name as logstash-nginx-sysadmins-* and my timefield as @timestamp, the version, and select save and test, which would look like this:

AC025E20-38D0-4676-B576-9F5932913BA1

Now we will import our dashboard template (Once again a massive thank you to Shenxiang, Qingkong and Ruixi which made this template available!), head over to dashboards and select import, then provide the ID: 11190, after that it will prompt what your dashboard needs to be named and you need to select your Elasticsearch and Prometheus datasource.

The description of the panels is in Chinese, if you would like it in english, I have translated mine to english and made the dashboard json available in this gist

Tour of our Dashboard Panels

Looking at our hotspot map:

grafana

The summary and top 10 pages:

76E8CBE1-4B03-4226-8041-B98879BAD66A

Page views, historical trends:

grafana-page-views

Top 10 referers and table data of our logs:

B17C4F55-DF91-4EA0-9669-C237FF560459

Thank You

I hope this was useful, if you have any issues with this feel free to reach out to me. If you like my work, please feel free to share this post, follow me on Twitter at @ruanbekker or visit me on my website

ko-fi