Build a simple Python Console App that uses MongoDB driver

Introduction
In this article we’ll build a very simple Menu-like Console Application in Python, that connects and operates MongoDB via PyMongo driver.
Table of Contents
Open Table of Contents
Prerequisites
Before starting, make sure you have:
- Python runtime installed
- MongoDB server instance running
For this tutorial we assume that MongoDB is available locally on:
mongodb://127.0.0.1:27017
Environment setup
Open Command Prompt and run:
python -m venv myonlinestore
This will create a Python virtual environment directory for your sample console app project, in our case this is myonlinestore.
At this point your myonlinestore project directory will have the following structure:
myonlinestore/
include/
Lib/
Scripts/
.gitignore
pyenv.cfg
Before we use the Virtual Environment, we need to activate it by running:
myonlinestore\Scripts\activate
As a result, in the Command Prompt, you should see the following:
(myonlinestore) C:\Users\<your_user_name>>
Next step is to install the PyMongo driver, in order to be able to connect to MongoDB:
pip install pymongo
Now, let’s verify the installation of PyMongo. To do so:
- Create a
test.pyfile in themyonlinestoreproject directory with the following contents:
import pymongo
print(pymongo.__version__)
- Run the
test.pyfile with the Python runtime:
python myonlinestore\test.py
It should print the version of the pymongo driver, here is the output from me:
(myonlinestore) C:\Users\borislav>python myonlinestore\test.py
4.17.0
Connecting to MongoDB
PyMongo uses MongoClient as the primary entry point. The default local connection string is mongodb://localhost:27017/.
It is a good practice to store your connection connection data in as Environment Variables or an .env file. Create .env file in your project directory with the following contents:
MONGO_URI = "mongodb://localhost:27017/"
If you are using MongoDB Atlas, you can use your Atlas connection string instead.
In order to be able to read the .env file contents from your Python app, you need to install the python-dotenv library.
pip install python-dotenv
To connect to MongoDB from your Python script, by using the MONGO_URI, defined in your .env file:
import os
from dotenv import load_dotenv
load_dotenv()
#in case MONGO_URI is not present, we provide a default value: "mongodb://localhost:27017/"
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/")
client = MongoClient(MONGO_URI)
Database and Collection access
MongoDB lazily creates databases and collections — they are not created until a document is inserted.
# Access (or create) a database
db = client["onlinestore_db"]
# Access (or create) a collection
products = db["products"]
# Introspect existing databases and collections
print(client.list_database_names())
print(db.list_collection_names())
CRUD Operations
1. Create (Insert)
#Insert single document
result = products.insert_one({
"name" : "Apple iPhone 17 Pro Max",
"qty" : 10,
"category" : "Mobile Phones",
"price" : 1400
})
print(f"Inserted ID: {result.inserted_id}")
#Insert multiple documents
new_products = [{
"name" : "Samsung Galaxy S25",
"qty" : 8,
"category" : "Mobile Phones",
"price" : 700
},
{
"name" : "Apple iPhone 17 Air",
"qty" : 5,
"category" : "Mobile Phones",
"price" : 1000
},
{
"name" : "Apple MacBook Pro M5 Pro 14",
"qty" : 5,
"category" : "Laptops",
"price" : 2500
},
{
"name" : "Lenovo Thinkpad T16 Gen 2",
"qty" : 12,
"category" : "Laptops",
"price" : 1800
}
]
result = products.insert_many(new_products)
print(f"Inserted IDs: {result.inserted_ids}")
2. Read (Find)
# Find one document, whose `name` contains `Lenovo`
doc = products.find_one({"name": {"$regex" : "Lenovo", "$options" : "i"}})
print(doc)
# Find all documents matching a filter and print their name and price only:
mobile_phones = products.find({"category": "Mobile Phones"})
for mp in mobile_phones:
print(f"Name: {mp["name"]} , Price: {mp["price"]}")
3. Update
# Update one document
products.update_one(
{"name": "Apple iPhone 17 Air"},
{"$set" : {"price" : 900} }
)
#Update all documents, whose name contains Apple (case insensitive)
products.updateMany(
{"name": {"$regex": "Apple", "$options" : "i"}}
{"$set" : {"tags" : ["apple"]} }
)
4. Delete
# Delete one (by Name)
products.delete_one({"name": "Apple MacBook Pro M5 Pro 14"})
# Delete one (by ObjectID)
id = "69eda8ccc864579625134a73"
oid = ObjectId(id)
products.delete_one({"_id": oid})
# Delete many
products.delete_many({"tags": "apple"})
Aggregation Pipeline
agg_pipeline = [
#Stage 1: Filter
{"$match" : {
"price" : {"$gt" : 500}
}
},
#Stage 2: Group all products by "category" and count the quantity for all items per "category"
{"$group" : {
"_id" : "$category",
"total_items_qty_per_category" : {"$sum" : "$qty"}
}},
#Stage 3: Sort by category name, i.e. after $group stage, this is the "_id"
{"$sort" : {"_id" : 1} }
]
products_count_per_cat = products.aggregate(pipeline)
for p in products_count_per_cat:
print(p)
The Simple Application
Below is the sample Menu-style Python Application for the myonlinestore project. The Python app contains CRUD operations and one Aggregation over the products collection of the onlinestore_db.
In the main method, we create an instance of the MyOnlineStore class. From within the constructor (i.e. __init__()), we create a MongoClient, and then use it to get a reference to the onlinestore_db and the products collection and store them into member-variables. Thus, we create a single MongoClient per client (a.k.a Console App Instance) and not per separate DB operation which is an expensive operation. We don’t need to worry about connection pooling - the PyMongo driver has internal connection pooling mechanism.
You can run the app, from the Command Prompt by executing the following command:
python my_online_store.py
However, keep in mind that the Virtual Environment for the project must be activated for your Command Prompt instance, otherwise, you will get an error.
To activate the Virtual Environment, you must run the activate script from your Command Prompt instance:
myonlinestore\Scripts\activate
Also, you can use VSCode + the Official Python Extension from Microsoft for easier development.
# my_online_store.py
import os
from dotenv import load_dotenv
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError, PyMongoError
from bson import ObjectId
load_dotenv()
#in case MONGO_URI is not present, we provide a default value: "mongodb://localhost:27017/"
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/")
DB_NAME = "onlinestore_db"
def get_client():
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=8000)
return client
class MyOnlineStore:
def __init__(self):
self.client = get_client()
self.database = self.client[DB_NAME]
self.col = self.database["products"]
# Ensure unique index on product's name
self.col.create_index("name", unique=True)
def close_connection(self):
self.client.close()
def add_product(self, name: str, category: str, price: int, qty: int) -> str:
doc = {"name": name, "category": category, "price": price, "qty": qty}
try:
result = self.col.insert_one(doc)
return str(result.inserted_id)
except DuplicateKeyError:
raise ValueError(f"Product with Name: '{name}' already exists.")
def find_product_by_id(self, id: str) -> dict:
try:
oid = ObjectId(id)
query = {"_id": oid}
return self.col.find_one(query)
except:
#in case the provided "id" is not valid ObjectId string
print("Invalid ID format")
return None
def print_product_details(self, product: dict):
if not product:
print("No data for product")
return
print(f"Product ID: {product["_id"]}")
print(f"Product Name: {product["name"]}")
print(f"Product Category: {product["category"]}")
print(f"Product Price: {product["price"]}")
print(f"Product Quantity: {product["qty"]}")
def update_product(self, id: str, new_name: str, new_category: str, new_price: int, new_quantity: int):
try:
oid = ObjectId(id)
query = {"_id": oid}
prod_to_update = self.col.find_one(query)
except:
#in case the provided "id" is not valid ObjectId string
print("Invalid ID format.")
return None
if not prod_to_update:
print("Product not found.")
return
update_data = {}
#update only when new_name is different than current name, and new name is not null or empty
if new_name != prod_to_update.get("name"): update_data["name"] = new_name
if new_category != prod_to_update.get("category"): update_data["category"] = new_category
if new_price != prod_to_update.get("price"): update_data["price"] = new_price
if new_quantity != prod_to_update.get("qty"): update_data["qty"] = new_quantity
if update_data:
result = self.col.update_one({"_id": oid}, {"$set": update_data})
print(f"Updated {result.modified_count} product. New values for {list(update_data.keys())}")
else:
print("Product was not updated.")
def delete_product_by_id(self, id: str):
try:
oid = ObjectId(id)
except:
#in case the provided "id" is not valid ObjectId string
print("Invalid ID format.")
return None
query = {"_id": oid}
delete_data = self.col.delete_one(query)
if delete_data.deleted_count > 0:
print(f"{delete_data.deleted_count} product(s) were deleted")
else:
print(f"Unable to delete product with ID {id}")
def find_products_by_name(self, name: str) -> list[dict]:
query = {"name": {"$regex": name, "$options" : "i"}}
return self.col.find(query)
def get_all_products(self) -> list[dict]:
return self.col.find()
def aggregate_product_items_per_cat(self) -> list[dict]:
agg_pipeline = [
#Stage 1: Filter
{
"$match" : {
"price" : {"$gt" : 500}
}
},
#Stage 2: Group all products by "category" and count the quantity for all items per "category"
{
"$group" : {
"_id" : "$category",
"total_items_qty_per_category" : {"$sum" : "$qty"}
}
},
#Stage 3: Sort by category name, i.e. after $group stage, this is the "_id"
{
"$sort" : {"_id" : 1}
}
]
return self.col.aggregate(agg_pipeline)
# ── Main ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
store = MyOnlineStore()
while True:
print("\n1. Add product " \
"\n2. Find Product by Name " \
"\n3. Find all Products " \
"\n4. Update product "
"\n5. Delete product by ID"
"\n6. Get all product items count per category, with price greater than 500. " \
"\n7. Exit")
choice = input("Enter your choice: ")
if choice == '1':
name = input("Product Name: ")
category = input("Category: ")
price = int(input("Price: "))
quantity = int(input("Quantity: "))
new_prod_id = store.add_product(name, category, price, quantity)
print(f"Successfully added product with ID: {new_prod_id}")
elif choice == '2':
name = input("Type a Product Name or part of it: ")
products = store.find_products_by_name(name)
for p in products:
print(p)
elif choice == '3':
products = store.get_all_products()
for p in products:
print(p)
elif choice == '4':
id = input("Type a Product ID to update: ")
product_to_update = store.find_product_by_id(id)
if not product_to_update:
continue
store.print_product_details(product_to_update)
name = input("New Name: (press Enter to keep old): ") or product_to_update['name']
category = input("New Category: (press Enter to keep old): ") or product_to_update['category']
try:
price_in = input("New Price: (press Enter to keep old): ")
price = int(price_in) if price_in else product_to_update['price']
qty_in = input("New Quantity: (press Enter to keep old): ")
qty = int(qty_in) if qty_in else product_to_update['qty']
store.update_product(id, name, category, price, qty)
except ValueError:
print("Price and Quantity must be Integer numbers.")
elif choice == '5':
id = input("Type a Product ID to delete: ")
store.delete_product_by_id(id)
elif choice == '6':
agg_result = store.aggregate_product_items_per_cat()
for r in agg_result:
print(r)
elif choice == '7':
store.close_connection()
print("Bye!")
break
else:
print("Invalid choice. Please try again.")