I would like to (locally) test inserting some values into a mongo database. If I run this:
import pymongo
import mongomock
@mongomock.patch(
servers=(("mongodb://null:null@localhost/test", 27017),), on_new="pymongo"
)
def get_mongodb_table():
return pymongo.MongoClient('mongodb://null:null@localhost/test')['test']['table']
table = get_mongodb_table()
table.insert_one({'a': 'b'}) # This works!
table.find_one({})
{'a': 'b', '_id': ObjectId('5d5be9e853f24bf46d268d78')}
However, the following fails:
import pyspark
import pymongo
import mongomock
SC = pyspark.SparkContext()
@mongomock.patch(
servers=(("mongodb://null:null@localhost/test", 27017),), on_new="pymongo"
)
def get_mongodb_table():
return pymongo.MongoClient('mongodb://null:null@localhost/test')['test']['table']
table = get_mongodb_table()
rdd = SC.parallelize([{'a': 0, 'b': 1}])
rdd.foreach(table.insert_one) # This doesn't work!
PicklingError: Could not serialize object: TypeError: 'Database' object is not callable
How can I fix the test so the error isn't raised? How can I test insert a dict from an rdd into a mongo database?
Jonathan Myers :
You are trying to reference your Mongo connection within your RDD that you'd previously established outside your RDD. Spark is trying to serialize this connection in order to process it inside each RDD, but it is unable to because of the properties of a Database object.\n\nHow to resolve: You will need to create your connection within your RDD processing. \n\nimport pyspark\nimport pymongo\nimport mongomock\n\nSC = pyspark.SparkContext()\n\
[email protected](\n servers=((\"mongodb://null:null@localhost/test\", 27017),), on_new=\"pymongo\"\n)\ndef get_mongodb_table():\n return pymongo.MongoClient('mongodb://null:null@localhost/test')['test']['table']\n\ndef create_and_insert(x):\n table = get_mongodb_table()\n table.insert_one(x)\n\nrdd = SC.parallelize([{'a': 0, 'b': 1}])\nrdd.foreach(create_and_insert)\n\n\nHowever, I highly recommend using foreachPartition instead of foreach when uploading to a database. foreach creates a separate connection for every element. foreachPartition creates a separate connection for each partition of elements, which will be a significantly smaller number when the number of elements is larger than what you have here.",
2019-08-20T13:45:13