Tailer Documentation
  • What is Tailer Platform?
  • Getting Started
    • Prepare your local environment for Tailer
    • Install Tailer SDK
    • Set up Google Cloud Platform
    • Encrypt your credentials
  • [Tutorial] Create a first data pipeline
    • Introduction
    • Prepare the demonstration environment
    • Copy files from one bucket to another
    • Load files into BigQuery tables
    • Prepare data
    • Build predictions
    • Export data
    • Congratulations!
    • [Video] Automatic Script
      • SQL script file
      • DDL script file
      • Tables to Tables script file
      • Launch configuration and furthermore
  • Data Pipeline Operations
    • Overview
    • Set constants with Context
      • Context configuration file
    • Move files with Storage to Storage
      • Storage to Storage configuration file
    • Load data with Storage to Tables
      • Storage to Tables configuration file
      • Storage to Tables DDL files
    • Stream incoming data with API To Storage
      • API To Storage configuration file
      • API To Storage usage examples
    • Transform data with Tables to Tables
      • Tables to Tables configuration file
      • Table to Table SQL and DDL files
    • Export data with Tables to Storage
      • [V3] Table to Storage configuration file
      • Table to Storage SQL file
      • [V1-V2: deprecated] Table to Storage configuration file
    • Orchestrate processings with Workflow
      • [V2] Workflow configuration file
      • [V1: deprecated] Workflow configuration file
    • Convert XML to CSV
      • Convert XML to CSV configuration file
    • Use advanced features with VM Launcher
      • Process code with VM Launcher
        • VM Launcher configuration file for code processing
      • Encrypt/Decrypt data with VM Launcher
        • VM Launcher configuration file for data encryption
        • VM Launcher configuration file for data decryption
    • Monitoring and Alerting
      • Monitoring and alerting parameters
    • Asserting Data quality with Expectations
      • List of Expectations
    • Modify files with File Utilities
      • Encrypt/Decrypt data with File Utilities
        • Configuration file for data encryption
        • Configuration file for data decryption
    • Transfer data with GBQ to Firestore
      • Table to Storage: configuration file
      • Table to Storage: SQL file
      • VM Launcher: configuration file
      • File-to-firestore python file
  • Tailer Studio
    • Overview
    • Check data operations' details
    • Monitor data operations' status
    • Execute data operations
    • Reset Workflow data operations
    • Archive data operations
    • Add notes to data operations and runs
    • View your data catalog
    • Time your data with freshness
  • Tailer API
    • Overview
    • Getting started
    • API features
  • Release Notes
    • Tailer SDK Stable Releases
    • Tailer Beta Releases
      • Beta features
      • Beta configuration
      • Tailer SDK API
    • Tailer Status
Powered by GitBook
On this page
  • Example of file-to-firestore.py :
  • Global parameters

Was this helpful?

Edit on GitHub
  1. Data Pipeline Operations
  2. Transfer data with GBQ to Firestore

File-to-firestore python file

This is the description of the SQL file used for a Table to Storage data operation.

PreviousVM Launcher: configuration fileNextOverview

Last updated 1 year ago

Was this helpful?

To run a GBQ to Firestore data operation, you need a precise python script like this one, used for the example.

Example of file-to-firestore.py :

import argparse
import os
import fnmatch
import datetime
import pytz
import simplejson as json
from google.cloud import firestore


if __name__ == "__main__":

    parser = argparse.ArgumentParser(
        description=__doc__, 
        formatter_class=argparse.RawDescriptionHelpFormatter)

    parser.add_argument("source_gcs", help="GCS Bucket.", type=str)
    parser.add_argument("items", help="GCS Bucket.", type=str)

    args = parser.parse_args()

    source_directory = args.source_gcs.strip().strip("/")

    # Process items
    #
    files_to_process_infos = json.loads(args.items)

    print("GCS source       : {}".format(source_directory))
    print("Items to process : {}".format(files_to_process_infos))

    fs_client = firestore.Client(project="project_id")
    batch = firestore.WriteBatch(client=fs_client)

    for item_to_process in files_to_process_infos.keys():

        print("\nProcessing : {}".format(item_to_process))

        for root, dirs, files in os.walk(source_directory + "/" + files_to_process_infos[item_to_process]["sub_dir"]):

            for filename in fnmatch.filter(files, files_to_process_infos[item_to_process]["file_template"]):

                full_filename = source_directory + "/" + files_to_process_infos[item_to_process]["sub_dir"] + "/" + filename 

                with open(full_filename, "r", encoding="utf-8") as input_file:

                    print("Processing file {}".format(full_filename))

                    batch_index = 1
                    total_writes = 0
                    payload = {}

                    for line in input_file.readlines():

                        payload = json.loads(line)
                        payload["update_time"] = datetime.datetime.now(pytz.timezone("UTC"))

                        # build document path
                        #
                        doc_ref = None
                        for fs_path_index, fs_item_path in enumerate(payload["firestore_path"].split("|")):

                            # First pass to instantiate object
                            #
                            if fs_path_index == 0:
                                doc_ref = fs_client.collection(fs_item_path)
                                continue

                            # Add collection or document
                            #
                            if fs_path_index % 2 == 0:

                                # collection
                                #
                                doc_ref = doc_ref.collection(fs_item_path)

                            else:

                                # document
                                #
                                doc_ref = doc_ref.document(fs_item_path)

                        if batch_index >= 499:
                            total_writes += batch_index
                            batch.commit()
                            batch_index = 1
                            print(total_writes)
                        else:
                            batch_index += 1

                        del payload["firestore_path"]
                        batch.set(doc_ref, payload, merge=True)

                    # Final commit
                    #
                    batch.commit()

                    total_writes += batch_index
                    print(total_writes)

To use the python deployment automation on Firestore, the SQL must follow a specific pattern.

Variables
Descriptions

project_id

line 30

mandatory

For your different use cases, it can be interesting to have the last calculation date of your dataset to Firestore.

Except for the project_id variable, nothing needs to be changed. You can copy and paste the code in your python file for this data operation.

Global parameters

👁️‍🗨️
🌐