To run a GBQ to Firestore data operation, you need a precise python script like this one, used for the example.
Example of file-to-firestore.py :
import argparse
import os
import fnmatch
import datetime
import pytz
import simplejson as json
from google.cloud import firestore
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("source_gcs", help="GCS Bucket.", type=str)
parser.add_argument("items", help="GCS Bucket.", type=str)
args = parser.parse_args()
source_directory = args.source_gcs.strip().strip("/")
# Process items
#
files_to_process_infos = json.loads(args.items)
print("GCS source : {}".format(source_directory))
print("Items to process : {}".format(files_to_process_infos))
fs_client = firestore.Client(project="project_id")
batch = firestore.WriteBatch(client=fs_client)
for item_to_process in files_to_process_infos.keys():
print("\nProcessing : {}".format(item_to_process))
for root, dirs, files in os.walk(source_directory + "/" + files_to_process_infos[item_to_process]["sub_dir"]):
for filename in fnmatch.filter(files, files_to_process_infos[item_to_process]["file_template"]):
full_filename = source_directory + "/" + files_to_process_infos[item_to_process]["sub_dir"] + "/" + filename
with open(full_filename, "r", encoding="utf-8") as input_file:
print("Processing file {}".format(full_filename))
batch_index = 1
total_writes = 0
payload = {}
for line in input_file.readlines():
payload = json.loads(line)
payload["update_time"] = datetime.datetime.now(pytz.timezone("UTC"))
# build document path
#
doc_ref = None
for fs_path_index, fs_item_path in enumerate(payload["firestore_path"].split("|")):
# First pass to instantiate object
#
if fs_path_index == 0:
doc_ref = fs_client.collection(fs_item_path)
continue
# Add collection or document
#
if fs_path_index % 2 == 0:
# collection
#
doc_ref = doc_ref.collection(fs_item_path)
else:
# document
#
doc_ref = doc_ref.document(fs_item_path)
if batch_index >= 499:
total_writes += batch_index
batch.commit()
batch_index = 1
print(total_writes)
else:
batch_index += 1
del payload["firestore_path"]
batch.set(doc_ref, payload, merge=True)
# Final commit
#
batch.commit()
total_writes += batch_index
print(total_writes)