• Analytics, Cloud Platforms, M5Stack
    13 min | 301

    #GCP: Implementing Real-Time data pipelines - from ingest to datastore

    Analytics, Cloud Platforms, M5Stack | 13 min | 301

    Two weeks ago, I published a tutorial that explains how to connect an M5Stack running MicroPython to the Google Cloud Platform using the IoT Core, and I did mention that upcoming tutorials will examine the following topics:

    • Collecting and synchronizing external data (weather from OpenWeatherMap) and other sensors -window/door status, sneezing detector-.
    • Saving the data to a NoSQL database
    • Displaying the obtained data on Google Data Studio.
    • Training a prediction mode

    This is the second tutorial in the series and it covers both collecting data from other sources (OpenWeatherMap) and saving it to a NoSQL database. I will use a non-conventional way to remain under the "always free tier" limits. However, as a consequence the performance of real-time data streaming will be reduced, which is not problem for this project.

    Therefore, this tutorial will try to achieve the following (see Fig. 1):

    • Create a VM instance, in which a Docker Engine runs containers (data-pipeline-service, owm-service)
    • Create a service to obtain weather data from OpenWeatherMap using Python (owm-service)
    • Create a service to subscribe to the Pub/Sub subscriptions to pull data, process and send it to the Firestore database using Apache Beam (data-pipeline-service)
    • Build the Docker images using Cloud Build
    Create a service account
    Fig. 1: Microservice application to check the air-quality running on GCP.

    The following products from GCP will be used in this tutorial:

    I'm going to try to keep the tutorial as compact as possible, which is why I'm going to refer to GCP documentation links to extend the information when needed.

    Google offers $300 free credit for new customers to spend on Google Cloud Platform (GCP) products during the first 12 months. If you've already used that, like me, you need to activate billing and you should define a budget limit and cap the API usage. To define a budget: Go to the GCP console and then to Billing:

    Budget limit

    However, setting a budget does not cap API usage. The purpose of budgets is to allow you to trigger alert notifications so that you know how your spending is trending over time.
    You can read more about this here: Set budgets and budget alerts.

    Therefore, if you want to set a cap on API usage to prevent incurring costs, you should cap on the API usage using Quotas.
    ## Software The following software will be used in this tutorial:
    ## Create a VM instance Google offers the f1-micro instance in the ["always free tier"](https://cloud.google.com/free) with a limitation of 720 hours/month, which means 30 days. This means you can run one instance of this VM machine within your account for free. There are some region limitations and these are the features: * 1 non-preemptible f1-micro VM instance per month in one of the following US regions: * -Oregon: us-west1 * -Iowa: us-central1 * -South Carolina: us-east1 * 30 GB-month HDD * 5 GB-month snapshot storage in the following regions: * -Oregon: us-west1 * -Iowa: us-central1 * -South Carolina: us-east1 * -Taiwan: asia-east1 * -Belgium: europe-west1 * 1 GB network egress from North America to all-region destinations per month (excluding China and Australia) The f1-micro offers 1 virtualized CPU core (with burst capability to help with sudden load spikes) and 614MB of memory. These specs don't sound great, but would they be enough for this project? Let's see! To create a new VM instance, head on over to the Google Cloud Console and sign up, and then head on over to the Google Compute Engine control panel. Maybe you need to activate the API. Then: 1. Click the **Create** Instance button. 2. **Configure** the VM hardware as described in Fig. 2. * Region: us-west1, us-central1 or us-east1 * Series: N1 * Machine type: f1-micro (1 vCPU, 614 memory) 3. **Configure** the VM software and Firewall as described in Fig. 3. * Boot disk: I choose the last Debian version (v.10 - Buster) * Firewall: Tick allow HTTP and HTTPS traffic (if you want to reach your VM from the open Internet, this is not necessary). 4. Click **Create** and wait for Google to create your VM instance.
    Create a registry
    Fig. 2: Configure the VM type.
    Create a device
    Fig. 3: Configure the VM firewall and boot disk.
    ### SWAP on VM instance The f1-micro has only 614MB thus, a swap space is needed. In this tutorial, I add `1G` of swap. If you want to create a bigger swap, replace `1G` with the size of the swap space that you need. But this is more than enough for the containers planned in this project. | | |:--| |![docker_gcp_running.jpg](https://lemariva.com/storage/app/uploads/public/5e9/222/e99/5e9222e99bf26429894177.png)| |Fig. 4: VM f1-micro instance running| Connect to the machine using the SSH button shown in Fig. 4 and then, the following steps show how to add a swap space on Debian 10: 1. Start by creating a file which will be used for swap: ```shell sudo fallocate -l 1G /swapfile ``` 2. Only the root user should be able to read and write to the swap file. Issue the command below to set the correct permissions: ```shell sudo chmod 600 /swapfile ``` 3. Use the `mkswap` tool to set up a Linux swap area on the file and activate the swap file by typing: ```shell sudo mkswap /swapfile sudo swapon /swapfile ``` 4. Make the change permanent by opening the `/etc/fstab` file: ```shell sudo nano /etc/fstab ``` and adding the following `/swapfile [...]` (see below) line at the end of the file: ```shell # /etc/fstab: static file system information # [...] /swapfile swap swap defaults 0 0 ``` 5. To verify that the swap is activated, type either the `swapon --show` or the `free -h` command as shown below: ```shell lemariva@esp32-core:~$ sudo swapon --show NAME TYPE SIZE USED PRIO /swapfile file 1024M 182.3M -2 ``` ```shell lemariva@esp32-core:~$ sudo free -h total used free shared buff/cache available Mem: 583Mi 488Mi 42Mi 4.0Mi 52Mi 22Mi Swap: 1.0Gi 370Mi 653Mi ``` ### Docker on VM After allocating the swap space, you need to install the Docker Engine and `docker-compose` to deploy the containers. The Docker installation can be easily performed on Debian typing: ```sh curl -sSL https://get.docker.com | sh ``` Then, add your username to the Docker group using: ```sh sudo usermod -aG docker ``` and `reboot` the VM: ```sh sudo reboot ``` After installing Docker, you need to install `python3-pip` so that you can install `docker-compose` as: ```sh sudo apt-get update sudo apt-get install python3-pip pip3 install docker-compose ``` ## OpenWeatherMap (OWM) service This service makes a GET request to the OpenWeatherMap (OWM) API to get the actual weather, processes the data, and publishes a message to Google Pub/Sub. Therefore, to connect to the services you need: * an OWM API and * a key to connect to GCP (service account). Furthermore, you need to create a Subscription to publish the messages. ### OWM API key To get an OWM API key, click [here](https://openweathermap.org/api) and register. The [free plan](https://openweathermap.org/price) includes 1000 calls/day with a limitation of 60 calls/min. Because the weather does not change too fast, this limitation is not a problem for this project. However, with the free plan, weather update might take up to two hours. This changes to less than 10 minutes, if you have a professional plan, which costs 470 USD/month. Thus, I stay with the free plan and hope that the weather does not change too fast to make the model training difficult or impossible.
    ### GCP key To create a service account to get a JSON key: 1. Open the **Service Accounts page** in the Cloud Console.
    2. Click **Select a project**, choose your project, and click **Open**. 3. Click **Create Service Account**. 4. Enter a service account name (friendly display name) and then click on **Create** (see Fig. 4). 5. Grant the service account the following **permissions** (see Fig. 5) and click on **Continue**: * Pub/Sub Publisher * Pub/Sub Subscriber * Cloud Datastore User
    We will use this account to publish messages from the owm-service (Pub/Sub Publisher permission), but also to get data and save it to the datastore (Pub/Sub Subscriber & Cloud Datastore User permissions) on the data-pipeline-service. You can create a different service account for each service. However, to make this tutorial easier, I will create only one service account for both services.
    6. Click on **Create Key** (see Fig. 5), **select JSON** on the overlay and click on **Create** (see Fig. 6). You can find more information about this topic [here](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console).
    Create a service account
    Fig. 3: Create a service account.
    Grant the service account permissions
    Fig. 4: Grant the service account permissions.
    Create a private key
    Fig. 5: Create a file with the private key.
    Download the private key
    Fig. 6: Download the private key of the service account.
    ### Topic and Subscription on Pub/Sub To publish a message, you need to create a topic by following these steps: 1. Go to the Pub/Sub topics page in the Cloud Console.
    2. Click on **Create a topic** (see Fig. 7). 3. In the **Topic ID** field, provide a unique topic name, for example, `openweathermap-topic`. 4. Click **Save** And that's all, you get the topic listed as in Fig. 7.
    Topics on Pub/Sub
    Fig. 7: Topics on Pub/Sub.
    ### Setting up the project In this subsection, you'll configure the project and set up an environment to upload the Docker files to the GCP to build the image using [Cloud Build](https://cloud.google.com/cloud-build). You get **120 Build-minutes/day free** to build images on GCP. Every extra will cost you USD0.003/minute. Having said that, you'll need the Google Cloud SDK. To install it, follow the instructions presented [here](https://cloud.google.com/sdk/docs#linux). To configure the files of this project, I recommend using [Visual Studio Code (VSC)](https://code.visualstudio.com/). Clone the repository OpenWeatherMap-GCP typing the following: ```sh git clone https://github.com/lemariva/OpenWeatherMap-GCP.git ``` and open the folder using the menu **File > Open Folder** on VSC. Before you upload the files to build the Docker image, you need to set up the service account key. This file will be copied into the container while it is being built. Therefore, rename the file `resources/credentials/service_account.json.example` to `service_account.json` and copy the content of the JSON file that you downloaded, while you created the GCP key inside that file. Or just rename the file that you downloaded to `service_account.json` and copy it into `resources/credentials/`. You can also modify the interval, in which `main.py` checks the weather conditions on OpenWeatherMap. The variable that defines this interval is located in the `main.py` file and is set as `LOOP_TIME_SLEEP = 60 * 10`, which means every 10 minutes. After finishing the steps above, you need to login to GCP using the SDK typing the following on a Terminal: ```sh gcloud auth login ``` This opens a browser, in which you select your Google account and give the SDK permission to access your account. If everything works, you'll see a webpage saying "You are now authenticated with the Google Cloud SDK!" Then, you can select the project using: ```sh gcloud config set project <>``` and submit the files to build the Docker image typing (inside the OpenWeatherMap-GCP folder): ```sh gcloud builds submit --config cloudbuild.yaml ``` you'll get something like this: ```sh Creating temporary tarball archive of 12 file(s) totalling 18.6 KiB before compression. Uploading tarball of [.] to [gs://core-iot-sensors_cloudbuild/source/158[..].tgz] [...] FETCHSOURCE Fetching storage object: gs://core-iot-sensors_cloudbuild/source/158[..].tgz#158[..] Copying gs://core-iot-sensors_cloudbuild/source/158... / [1 files][ 7.9 KiB/ 7.9 KiB] Operation completed over 1 objects/7.9 KiB. BUILD Already have image (with digest): gcr.io/cloud-builders/docker Sending build context to Docker daemon 31.23kB Step 1/8 : FROM python:3.7-buster 3.7-buster: Pulling from library/python [...] DONE ------------------------------------------ ID CREATE_TIME DURATION SOURCE IMAGES STATUS a97ff1... 2020-04-12T14:10:13+00:00 1M25S gs://core-iot-sensors_cloudbuild/source/158[..].tgz gcr.io/core-iot-sensors/owm-service (+1 more) SUCCESS ``` As you can see in the log above, building the image took me about 85 seconds. To check the built images, go to the Container Registry page in the Cloud Console:## Data Pipeline service This service will subscribe to the Pub/Sub subscriptions, get messages, process, synchronize them, and finally save the results in Firestore. The pipeline is programmed using [Apache Beam in Python](https://beam.apache.org/documentation/sdks/python/). This service could also run on Dataflow (e.g. using the DataflowRunner), or you could create a job on Dataflow using the Cloud Console and a template, but a pipeline on Dataflow would cost you about $0.40 and $1.20/hour. In this project, the pipeline will run on the f1-micro VM instance using [Direct Runner](https://beam.apache.org/documentation/runners/direct/), which means locally. This limits the pipeline performance including possible scaling, but as I said before, for this project this is not a big deal. Furthermore, if you need more performance, you can change the runner to DataflowRunner and with minor modification (e.g. service authentication), it should work. ### Pub/Sub Subscription (getting the data) The `apache_beam.io.ReadFromPubSub()` function can read messages from topics and subscriptions. As mentioned in the [documentation](https://beam.apache.org/releases/pydoc/2.19.0/apache_beam.io.gcp.pubsub.html), if you don't use the `subscription` parameter and provide the `topic` parameter, a temporary subscription will be created from the specified topic. However, for the GCP key that we created above, we didn't grant permission to create subscriptions (Pub/Sub Editor). Thus, you will get this error: `google.api_core.exceptions.PermissionDenied: 403 User not authorized to perform this action.`. Therefore, the service will subscribe to subscriptions. In the [tutorial](https://lemariva.com/blog/2020/04/micropython-google-cloud-platform-getting-data-m5stack-atom-sensing-air-quality) about connecting the M5Stack to Google IoT Core, I included the instructions to create a subscription to the sensor topic. We need to repeat these steps for the owm-service. To create a subscription, in the Cloud Console, go to the Subscriptions page. 1. Go to the **Subscriptions page**
    2. Click **Create subscription**. 3. Enter the **Subscription ID** (e.g. `owm-data`). 4. Choose or create a topic from the drop-down menu (e.g. `openweathermap-topic`). The subscription receives messages from the topic. 5. Click **Create**. For more information, check this [tutorial](https://cloud.google.com/pubsub/docs/admin#pubsub-create-topic-protocol). ### GCP key You need to provide a service account key file `service_account.json` inside the folder `resources/credentials/` as mentioned in the section above. The permissions needed for this service are `Pub/Sub Subscriber` and `Cloud Datastore User`. ### Setting up the service To clone the pipeline service, type the following on a Terminal ```sh git clone https://github.com/lemariva/uPyApacheBeam.git ``` The data pipeline reads the data from multiple topics and converts it to a tuple with this form: `(timestamp, dict)`. Inside the dict structure are the sensor/data values. This enables the service to synch the data using its timestamp (`TimestampedValue`). To do that, the pipeline uses `apache_beam.CoGroupByKey()` and a windowing system `apache_beam.WindowInto`. This means: If two topics have the same timestamp, they will be combined into [Pcollection](https://beam.apache.org/documentation/pipelines/design-your-pipeline/), which is then converted into an [Entity](https://cloud.google.com/datastore/docs/concepts/entities) and saved to Firestore; If they don't, they are saved separately into two documents inside the same collection. A saved document under the `iot-air` collection on the Firestore looks like this: ```json m5stack-air-condition apm10: 16 apm100: 693 apm25: 23 cpm10: 16 cpm100: 3 cpm25: 23 gas: 43709.48 hum: 46.62529 press: 1017.51 temp: 22.81116 owm-service all_clouds: 93 feels_like: 15.07 humidity: 42 icon: "04d" id: 804 main: "Clouds" sunrise: 1586579490 sunset: 1586628718 temp: 18.03 visibility: 10000 w_deg: 120 w_speed: 2.6 timestamp: 1586620922 ``` In this case, two messages were combined and as you can see, the `device_id` was used as a map (nested node) for the sensor/data values. This is done in the `EntityWrapper` class.
    The data structure described above was/is useful for me to export the data to a bucket and import it on BigQuery for Google Data Studio. If you have better or another idea to process and save the data, feel free to modify the file, leave me a comment, contact me or even send me a Pull Request on Github, etc.
    After modifying the files, you need to upload and build the image using Cloud Build. Therefore, type the following inside the uPyApacheBeam folder: ```sh gcloud builds submit --config cloudbuild.yaml ``` and you'll get this: ```sh Creating temporary tarball archive of 12 file(s) totalling 20.7 KiB before compression. [...] ID CREATE_TIME DURATION SOURCE IMAGES STATUS 83216... 2020-04... 2M12S gs://core-iot-... gcr.io/core-... SUCCESS ``` You need to activate the Firestore API. To do that, go to the Cloud Console and then to the [Firestore page](https://console.cloud.google.com/firestore) and **select** the Native mode. The mode you select (Native mode) will be permanent for the project. ## Deploy the microservice application After creating both Docker images, you can use docker-compose to deploy the micro-service application. But first, you need to give the VM machine access to the Container Registry and set up environment variables for the Docker containers. ### GCP key As you may have already noticed, you'll need a service account to access the Container Registry. To create a service account to get a JSON key, follow the steps described above. This time, just grant the `Storage Object Viewer` permission to the service account as in Fig. 8.
    Create service account
    Fig. 8: Create a service account to access to the Container Registry.
    Upload files to the VM
    Fig. 9: Upload files to the f1-micro VM instance.
    After creating and downloading the key, follow these steps to upload the key to the f1-micro VM: 1. Click on the **SSH button** from the f1-micro VM instance listed on the Google Compute Engine control panel (see Fig. 4). 2. **Upload** the downloaded file clicking on the **Gear > Upload file** option (see Fig. 9). 3. **Authenticate** to the Container Registry typing the following on the SSH Terminal: ```sh docker login -u _json_key -p "$(cat [FILENAME].json)" https://[[HOSTNAME]] ``` Replace `[FILENAME]` with the uploaded filename and `[HOSTNAME]` is gcr.io, us.gcr.io, eu.gcr.io, or asia.gcr.io, according to where your registry is located. You will get something like this: ``` WARNING! Using --password via the CLI is insecure. Use --password-stdin. Login Succeeded ``` More info [here](https://cloud.google.com/container-registry/docs/advanced-authentication). If you prefer a secure way to store your credentials, check the following [tutorial](https://lemariva.com/blog/2019/11/raspberry-pi-encrypt-and-save-docker-credentials). ### Setting up the application You need to modify some files, before running the application: 1. **Rename** the file `.env.sample` from one of the repositories to `.env` and **modify** the following variables: * `PROJ_NAME`: your project name on GCP * `PROJ_TOPIC`: the topic that you've chosen for the owm-service (e.g. `openweathermap-topic`) * `PROJ_SUBS`: the subscription names for both services (owm-service, iot sensor from the [M5stack tutorial](https://lemariva.com/blog/2020/04/micropython-google-cloud-platform-getting-data-m5stack-atom-sensing-air-quality)) separated with a `,` without any spaces between, e.g. ``` projects/core-iot-sensors/subscriptions/esp32-iot-data,projects/core-iot-sensors/subscriptions/owm-data ``` * `PROJ_DBKIND`: type a collection name (e.g. `iot-air`) * `OWM_API`: the OpenWeatherMap API * `LOCATION`: latitude and longitude of your home (separated with a comma without spaces) to get the current weather (e.g. `52.33,9.00`, you can get it from Google Maps). Don't use `"` to enclose the variable values! This file includes the environment variables that will be used inside the container. 5. Modify the project name (core-iot-sensors) inside the `docker-compose.yml` to match your GCP project name. 6. **Create** a folder e.g. **home-air-quality** inside the VM using the SSH terminal. ```sh mkdir home-air-quality ``` 7. **Upload** the `.env` and `docker-compose.yml` to the folder using the **Gear > Upload file** option (see Fig. 9). If you don't see the .env file (linux), use the right-click on the file browser and activate the `Show hidden Files` option. The files will be uploaded to the `$HOME` folder, therefore you need to move them inside the folder: 8. *Move* the files inside the `home-air-quality` folder: ```sh cd home-air-quality mv ~/.env . mv ~/docker-compose.yml . ``` 9. *Start* the microservice application typing: ```sh docker-compose up -d ``` inside the folder where the `docker-compose.yml` file is located (see Fig. 10).
    Create service account
    Fig. 10: Microservice application deployed dettached.
    Upload files to the VM
    Fig. 11: Microservice application deployed in "debugging mode".
    If you want to stop the application, you have these two options: 1. This stops the containers but it doesn't remove anything: ```sh docker-compose stop ``` 2. This stops the containers, removes them and removes any network connected to them. ```sh docker-compose down ``` If you stopped the containers and but actually you wanted to remove them, just type: ```sh docker-compose rm ``` The `rm` or `down` option is needed if you've updated the images on the Container Registry. Otherwise, `docker-compose` restarts the stopped containers. Furthermore, you need to force the pull of the latest image or remove the old Docker images. Otherwise, Docker takes the local images and doesn't download the newest versions. To force a pull of the latest images, just type: ```sh docker-compose pull ``` For debugging purposes, you can skip the detached option `-d` and deploy the application using: ```sh docker-compose up ``` This gives you Fig. 11, in which you can see the debugging messages of the application. You can also stop the application pressing Ctrl+C (this doesn't remove the containers). If everything works, you can check the data on the [Firestore page](https://console.cloud.google.com/firestore) in the Cloud Console (see Fig. 12).

    Fig. 12: Firestore acquiring data from the microservice application.
    ## Conclusions This tutorial helps you to create a data pipeline from sensors running MicroPython and other data sources up to databases in the cloud. This is a follow-up from the last tutorial: [MicroPython: Google Cloud Platform getting data from an M5Stack ATOM sensing the air-quality](https://lemariva.com/blog/2020/04/micropython-google-cloud-platform-getting-data-m5stack-atom-sensing-air-quality). In this tutorial, you learn how to set up a virtual machine on GCP, install Docker container and run a microservice application to get data from different sources and synch it to save it in a NoSQL database. The following products from GCP were used in this tutorial: * [IoT Core](https://cloud.google.com/iot-core) * [Compute Engine](https://cloud.google.com/compute) * [Pub/Sub](https://cloud.google.com/pubsub/docs/overview)m * [Firestore](https://firebase.google.com/docs/firestore) * [Cloud Build](https://cloud.google.com/cloud-build) * [Container Register](https://cloud.google.com/container-registry) The setup can run free using the "always free tier" offered by Google. The next tutorial will be about getting the saved data and displaying it on Google Data Studio!