Processing a list with queued tasks. How to make sure each value in the list is used once and only once?

If you do not have a huge number of profile elements in your projects recs: Schedule_daily_projects_tasks() can join all your profile elements into a serialized string. Pass that string to the task queue as a parameter. Each task process pops one element from the string, processes it, and enqueues the task with the shorter serialized string if no exception occurs. If you are doing updates to the same entity, enqueue the task with a time delay of two seconds or so. This will also help to regulate the number of instances fired up to handle your task queue which can keep costs down.

The problem you're encountering is more or less as you describe: you've enqueued multiple tasks that are trying to concurrently modify the same datastore object. Since you're not using transactions, multiple tasks end up retrieving the same data, doing the same operations, then overwriting each other's results.



You could use a datastore transaction to avoid this, but a much better solution would be to restructure your tasks so that only a single task is modifying each datastore entity. That way, you have no synchronization or transaction issues to worry about.

You might want to consider adding your tasks as a batch:



targets=pro.targets
tasks=[]
...
screen_name=targets.pop()
tasks.append(taskqueue.Task(url='/url_to_my_worker', params={'profk': key, 'screen_name':screen_name}, eta=tim))
...
pro.put()
taskqueue.Queue().add(tasks)


Note that you can avoid tasks on a particular queue from running concurrently by creating a queue.yaml that specifies max_concurrent_requests as 1:



queue:
- name: default
max_concurrent_requests: 1

Python comes with a built-in thread-safe Queue library module.



http://docs.python.org/library/queue.html




The Queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics. It depends on the availability of thread support in Python; see the threading module.




You can make it so that when a pop() is called, it simply blocks until any locks are released.



Edit: The example page linked uses the Queue module to do its work.



import Queue
targets = ['foo','bar','foo2','bar2','foo3','bar3','foo4','bar4','foo5','bar5']
queue = Queue.LifoQueue() # Last in first out
for target in targets:
queue.put(target)
myproject1.targets = queue
##########################
class schedule_daily_profile_tasks(webapp.RequestHandler):
....
screen_name=pro.targets.get(block=true)


More: