Home:ALL Converter>Test inserting values into mongodb (pyspark, pymongo)

Test inserting values into mongodb (pyspark, pymongo)

Ask Time:2019-08-20T20:42:54         Author:EuRBamarth

Json Formatter

I would like to (locally) test inserting some values into a mongo database. If I run this:

import pymongo
import mongomock

@mongomock.patch(
    servers=(("mongodb://null:null@localhost/test", 27017),), on_new="pymongo"
)
def get_mongodb_table():
    return pymongo.MongoClient('mongodb://null:null@localhost/test')['test']['table']

table = get_mongodb_table()
table.insert_one({'a': 'b'})  # This works!
table.find_one({})

{'a': 'b', '_id': ObjectId('5d5be9e853f24bf46d268d78')}

However, the following fails:

import pyspark
import pymongo
import mongomock

SC = pyspark.SparkContext()

@mongomock.patch(
    servers=(("mongodb://null:null@localhost/test", 27017),), on_new="pymongo"
)
def get_mongodb_table():
    return pymongo.MongoClient('mongodb://null:null@localhost/test')['test']['table']

table = get_mongodb_table()

rdd = SC.parallelize([{'a': 0, 'b': 1}])
rdd.foreach(table.insert_one)  # This doesn't work!

PicklingError: Could not serialize object: TypeError: 'Database' object is not callable

How can I fix the test so the error isn't raised? How can I test insert a dict from an rdd into a mongo database?

Author:EuRBamarth,eproduced under the CC 4.0 BY-SA copyright license with a link to the original source and this disclaimer.
Link to original article:https://stackoverflow.com/questions/57573964/test-inserting-values-into-mongodb-pyspark-pymongo
Jonathan Myers :

You are trying to reference your Mongo connection within your RDD that you'd previously established outside your RDD. Spark is trying to serialize this connection in order to process it inside each RDD, but it is unable to because of the properties of a Database object.\n\nHow to resolve: You will need to create your connection within your RDD processing. \n\nimport pyspark\nimport pymongo\nimport mongomock\n\nSC = pyspark.SparkContext()\n\[email protected](\n servers=((\"mongodb://null:null@localhost/test\", 27017),), on_new=\"pymongo\"\n)\ndef get_mongodb_table():\n return pymongo.MongoClient('mongodb://null:null@localhost/test')['test']['table']\n\ndef create_and_insert(x):\n table = get_mongodb_table()\n table.insert_one(x)\n\nrdd = SC.parallelize([{'a': 0, 'b': 1}])\nrdd.foreach(create_and_insert)\n\n\nHowever, I highly recommend using foreachPartition instead of foreach when uploading to a database. foreach creates a separate connection for every element. foreachPartition creates a separate connection for each partition of elements, which will be a significantly smaller number when the number of elements is larger than what you have here.",
2019-08-20T13:45:13
yy