From 60bf2d0a9890e484a7c70e1de178f5600ca2db39 Mon Sep 17 00:00:00 2001 From: Michał Górny Date: Sat, 7 Sep 2013 00:05:24 +0200 Subject: Use context managers to clean up settings.DATABASES after binds. --- okupy/accounts/views.py | 418 +++++++++++++++++++------------------- okupy/common/ldap_helpers.py | 55 +++-- okupy/tests/unit/test_ldapuser.py | 38 ++-- 3 files changed, 262 insertions(+), 249 deletions(-) (limited to 'okupy') diff --git a/okupy/accounts/views.py b/okupy/accounts/views.py index 96b2614..ab96d87 100644 --- a/okupy/accounts/views.py +++ b/okupy/accounts/views.py @@ -376,245 +376,245 @@ def activate(request, token): @otp_required def profile_settings(request): """ Primary account settings, """ - user_info = get_bound_ldapuser(request) - profile_settings = None - if request.method == "POST": - profile_settings = ProfileSettingsForm(request.POST) - if profile_settings.is_valid(): - try: - #birthday = profile_settings.cleaned_data['birthday'] - first_name = profile_settings.cleaned_data['first_name'] - last_name = profile_settings.cleaned_data['last_name'] - - if user_info.first_name != first_name: - user_info.first_name = first_name - - if user_info.last_name != last_name: - user_info.last_name = last_name - - user_info.full_name = '%s %s' % (first_name, last_name) - user_info.gecos = '%s %s' % (first_name, last_name) - - """ - if user_info.birthday != birthday: - user_info.birthday = birthday - """ + with get_bound_ldapuser(request) as user_info: + profile_settings = None + if request.method == "POST": + profile_settings = ProfileSettingsForm(request.POST) + if profile_settings.is_valid(): try: - user_info.save() - except IntegrityError: + #birthday = profile_settings.cleaned_data['birthday'] + first_name = profile_settings.cleaned_data['first_name'] + last_name = profile_settings.cleaned_data['last_name'] + + if user_info.first_name != first_name: + user_info.first_name = first_name + + if user_info.last_name != last_name: + user_info.last_name = last_name + + user_info.full_name = '%s %s' % (first_name, last_name) + user_info.gecos = '%s %s' % (first_name, last_name) + + """ + if user_info.birthday != birthday: + user_info.birthday = birthday + """ + try: + user_info.save() + except IntegrityError: + pass + except ldap.TYPE_OR_VALUE_EXISTS: pass - except ldap.TYPE_OR_VALUE_EXISTS: - pass - except Exception as error: - logger.critical(error, extra=log_extra_data(request)) - logger_mail.exception(error) - raise OkupyError("Can't contact LDAP server") - else: - profile_settings = ProfileSettingsForm() + except Exception as error: + logger.critical(error, extra=log_extra_data(request)) + logger_mail.exception(error) + raise OkupyError("Can't contact LDAP server") + else: + profile_settings = ProfileSettingsForm() - return render(request, 'settings-profile.html', { - 'profile_settings': profile_settings, - 'user_info': user_info, - }) + return render(request, 'settings-profile.html', { + 'profile_settings': profile_settings, + 'user_info': user_info, + }) @strong_auth_required @otp_required def password_settings(request): """ Password settings """ - user_info = get_bound_ldapuser(request) - password_settings = None - if request.method == "POST": - password_settings = PasswordSettingsForm(request.POST) - if password_settings.is_valid(): - try: - new_password = password_settings.cleaned_data['new_password'] - new_password_verify = password_settings.cleaned_data[ - 'new_password_verify'] - old_password = password_settings.cleaned_data['old_password'] - - if old_password and (new_password == new_password_verify): - for hash in list(user_info.password): - print hash - try: - if ldap_md5_crypt.verify(old_password, hash): - user_info.password.append( - ldap_md5_crypt.encrypt( - new_password_verify)) - user_info.password.remove(hash) - break - except ValueError: - # ignore unknown hashes - pass + with get_bound_ldapuser(request) as user_info: + password_settings = None + if request.method == "POST": + password_settings = PasswordSettingsForm(request.POST) + if password_settings.is_valid(): try: - user_info.save() - except IntegrityError: + new_password = password_settings.cleaned_data['new_password'] + new_password_verify = password_settings.cleaned_data[ + 'new_password_verify'] + old_password = password_settings.cleaned_data['old_password'] + + if old_password and (new_password == new_password_verify): + for hash in list(user_info.password): + print hash + try: + if ldap_md5_crypt.verify(old_password, hash): + user_info.password.append( + ldap_md5_crypt.encrypt( + new_password_verify)) + user_info.password.remove(hash) + break + except ValueError: + # ignore unknown hashes + pass + try: + user_info.save() + except IntegrityError: + pass + except ldap.TYPE_OR_VALUE_EXISTS: pass - except ldap.TYPE_OR_VALUE_EXISTS: - pass - except Exception as error: - logger.critical(error, extra=log_extra_data(request)) - logger_mail.exception(error) - raise OkupyError("Can't contact LDAP server") - else: - password_settings = PasswordSettingsForm() + except Exception as error: + logger.critical(error, extra=log_extra_data(request)) + logger_mail.exception(error) + raise OkupyError("Can't contact LDAP server") + else: + password_settings = PasswordSettingsForm() - return render(request, 'settings-password.html', { - 'password_settings': password_settings, - 'user_info': user_info, - }) + return render(request, 'settings-password.html', { + 'password_settings': password_settings, + 'user_info': user_info, + }) @strong_auth_required @otp_required def email_settings(request): """ Email Settings """ - user_info = get_bound_ldapuser(request) - email_settings = None - if request.method == "POST": - email_settings = EmailSettingsForm(request.POST) - if email_settings.is_valid(): - try: - email = email_settings.cleaned_data['email'] - - if request.POST.get('delete'): - user_info.email.remove(email) - else: - user_info.email.append(email) - + with get_bound_ldapuser(request) as user_info: + email_settings = None + if request.method == "POST": + email_settings = EmailSettingsForm(request.POST) + if email_settings.is_valid(): try: - user_info.save() - except IntegrityError: + email = email_settings.cleaned_data['email'] + + if request.POST.get('delete'): + user_info.email.remove(email) + else: + user_info.email.append(email) + + try: + user_info.save() + except IntegrityError: + pass + except ldap.TYPE_OR_VALUE_EXISTS: pass - except ldap.TYPE_OR_VALUE_EXISTS: - pass - except Exception as error: - logger.critical(error, extra=log_extra_data(request)) - logger_mail.exception(error) - raise OkupyError("Can't contact LDAP server") - else: - email_settings = EmailSettingsForm() + except Exception as error: + logger.critical(error, extra=log_extra_data(request)) + logger_mail.exception(error) + raise OkupyError("Can't contact LDAP server") + else: + email_settings = EmailSettingsForm() - return render(request, 'settings-email.html', { - 'email_settings': email_settings, - 'user_info': user_info, - }) + return render(request, 'settings-email.html', { + 'email_settings': email_settings, + 'user_info': user_info, + }) @strong_auth_required @otp_required def contact_settings(request): """ Contact details """ - user_info = get_bound_ldapuser(request) - contact_settings = None - if request.method == "POST": - contact_settings = ContactSettingsForm(request.POST) - if contact_settings.is_valid(): - try: - gpg_fingerprint = contact_settings.cleaned_data[ - 'gpg_fingerprint'] - im = contact_settings.cleaned_data['im'] - latitude = contact_settings.cleaned_data['latitude'] - location = contact_settings.cleaned_data['location'] - longitude = contact_settings.cleaned_data['longitude'] - phone = contact_settings.cleaned_data['phone'] - website = contact_settings.cleaned_data['website'] - - if user_info.location != location: - user_info.location = location - - if user_info.phone != phone: - user_info.phone = phone - - if user_info.website != website: - user_info.website.pop() - user_info.website.append(website) - - if user_info.im != im: - user_info.im.pop() - user_info.im.append(im) - - if user_info.longitude != longitude: - user_info.longitude = longitude - - if user_info.latitude != latitude: - user_info.latitude = latitude - - if user_info.gpg_fingerprint != gpg_fingerprint: - user_info.gpg_fingerprint.pop() - user_info.gpg_fingerprint.append(gpg_fingerprint) - + with get_bound_ldapuser(request) as user_info: + contact_settings = None + if request.method == "POST": + contact_settings = ContactSettingsForm(request.POST) + if contact_settings.is_valid(): try: - user_info.save() - except IntegrityError: + gpg_fingerprint = contact_settings.cleaned_data[ + 'gpg_fingerprint'] + im = contact_settings.cleaned_data['im'] + latitude = contact_settings.cleaned_data['latitude'] + location = contact_settings.cleaned_data['location'] + longitude = contact_settings.cleaned_data['longitude'] + phone = contact_settings.cleaned_data['phone'] + website = contact_settings.cleaned_data['website'] + + if user_info.location != location: + user_info.location = location + + if user_info.phone != phone: + user_info.phone = phone + + if user_info.website != website: + user_info.website.pop() + user_info.website.append(website) + + if user_info.im != im: + user_info.im.pop() + user_info.im.append(im) + + if user_info.longitude != longitude: + user_info.longitude = longitude + + if user_info.latitude != latitude: + user_info.latitude = latitude + + if user_info.gpg_fingerprint != gpg_fingerprint: + user_info.gpg_fingerprint.pop() + user_info.gpg_fingerprint.append(gpg_fingerprint) + + try: + user_info.save() + except IntegrityError: + pass + except ldap.TYPE_OR_VALUE_EXISTS: pass - except ldap.TYPE_OR_VALUE_EXISTS: - pass - except Exception as error: - logger.critical(error, extra=log_extra_data(request)) - logger_mail.exception(error) - raise OkupyError("Can't contact LDAP server") - else: - contact_settings = ContactSettingsForm() + except Exception as error: + logger.critical(error, extra=log_extra_data(request)) + logger_mail.exception(error) + raise OkupyError("Can't contact LDAP server") + else: + contact_settings = ContactSettingsForm() - return render(request, 'settings-contact.html', { - 'contact_settings': contact_settings, - 'user_info': user_info, - }) + return render(request, 'settings-contact.html', { + 'contact_settings': contact_settings, + 'user_info': user_info, + }) @strong_auth_required @otp_required def gentoo_dev_settings(request): """ Gentoo related information """ - user_info = get_bound_ldapuser(request) - gentoo_account_settings = None - if request.method == "POST": - gentoo_account_settings = GentooAccountSettingsForm(request.POST) - if gentoo_account_settings.is_valid(): - try: - devbug = gentoo_account_settings.cleaned_data['developer_bug'] - gentoo_join_date = gentoo_account_settings.cleaned_data[ - 'gentoo_join_date'] - gentoo_mentor = gentoo_account_settings.cleaned_data['mentor'] - ssh_pubkey = gentoo_account_settings.cleaned_data['ssh_key'] - - if user_info.developer_bug != devbug: - user_info.developer_bug.append(devbug) - - if user_info.gentoo_join_date != gentoo_join_date: - user_info.gentoo_join_date.append(gentoo_join_date) - - if user_info.mentor != gentoo_mentor: - user_info.mentor.append(gentoo_mentor) - - if ssh_pubkey: - user_info.ssh_key.append(ssh_pubkey) - - if user_info.is_retired or user_info.gentoo_retire_date: - gentoo_retire_date = gentoo_account_settings.cleaned_data[ - 'gentoo_retire_date'] - if user_info.gentoo_retire_date != gentoo_retire_date: - user_info.gentoo_retire_date.append( - gentoo_retire_date) - + with get_bound_ldapuser(request) as user_info: + gentoo_account_settings = None + if request.method == "POST": + gentoo_account_settings = GentooAccountSettingsForm(request.POST) + if gentoo_account_settings.is_valid(): try: - user_info.save() - except IntegrityError: + devbug = gentoo_account_settings.cleaned_data['developer_bug'] + gentoo_join_date = gentoo_account_settings.cleaned_data[ + 'gentoo_join_date'] + gentoo_mentor = gentoo_account_settings.cleaned_data['mentor'] + ssh_pubkey = gentoo_account_settings.cleaned_data['ssh_key'] + + if user_info.developer_bug != devbug: + user_info.developer_bug.append(devbug) + + if user_info.gentoo_join_date != gentoo_join_date: + user_info.gentoo_join_date.append(gentoo_join_date) + + if user_info.mentor != gentoo_mentor: + user_info.mentor.append(gentoo_mentor) + + if ssh_pubkey: + user_info.ssh_key.append(ssh_pubkey) + + if user_info.is_retired or user_info.gentoo_retire_date: + gentoo_retire_date = gentoo_account_settings.cleaned_data[ + 'gentoo_retire_date'] + if user_info.gentoo_retire_date != gentoo_retire_date: + user_info.gentoo_retire_date.append( + gentoo_retire_date) + + try: + user_info.save() + except IntegrityError: + pass + except ldap.TYPE_OR_VALUE_EXISTS: pass - except ldap.TYPE_OR_VALUE_EXISTS: - pass - except Exception as error: - logger.critical(error, extra=log_extra_data(request)) - logger_mail.exception(error) - raise OkupyError("Can't contact LDAP server") - else: - gentoo_account_settings = GentooAccountSettingsForm() + except Exception as error: + logger.critical(error, extra=log_extra_data(request)) + logger_mail.exception(error) + raise OkupyError("Can't contact LDAP server") + else: + gentoo_account_settings = GentooAccountSettingsForm() - return render(request, 'settings-gentoo.html', { - 'gentoo_account_settings': gentoo_account_settings, - 'user_info': user_info, - }) + return render(request, 'settings-gentoo.html', { + 'gentoo_account_settings': gentoo_account_settings, + 'user_info': user_info, + }) @strong_auth_required @@ -624,11 +624,11 @@ def otp_setup(request): secret = None conf_form = None skeys = None - user = get_bound_ldapuser(request) if request.method == 'POST': if 'disable' in request.POST: - dev.disable(user) + with get_bound_ldapuser(request) as user: + dev.disable(user) elif 'confirm' in request.POST and 'otp_secret' in request.session: secret = request.session['otp_secret'] conf_form = OTPForm(request.POST) @@ -646,11 +646,12 @@ def otp_setup(request): messages.error(request, 'Token verification failed.') conf_form = OTPForm() else: - dev.enable(user, secret) - secret = None - conf_form = None - sdev = SOTPDevice.objects.get(user=request.user) - skeys = sdev.gen_keys(user) + with get_bound_ldapuser(request) as user: + dev.enable(user, secret) + secret = None + conf_form = None + sdev = SOTPDevice.objects.get(user=request.user) + skeys = sdev.gen_keys(user) messages.info(request, 'The new secret has been set.') elif 'enable' in request.POST: secret = dev.gen_secret() @@ -658,7 +659,8 @@ def otp_setup(request): conf_form = OTPForm() elif 'recovery' in request.POST: sdev = SOTPDevice.objects.get(user=request.user) - skeys = sdev.gen_keys(user) + with get_bound_ldapuser(request) as user: + skeys = sdev.gen_keys(user) messages.info(request, 'Your old recovery keys have been revoked.') elif 'cancel' in request.POST: messages.info(request, 'Secret change aborted. Previous settings' diff --git a/okupy/common/ldap_helpers.py b/okupy/common/ldap_helpers.py index ff8cd97..5b3e76a 100644 --- a/okupy/common/ldap_helpers.py +++ b/okupy/common/ldap_helpers.py @@ -34,24 +34,21 @@ def get_bound_ldapuser(request, password=None): def set_secondary_password(request, password): """ Generate a secondary passsword and encrypt it in the session """ - user = get_bound_ldapuser(request, password) - - secondary_password = Random.get_random_bytes(48) - request.session['secondary_password'] = ( - cipher.encrypt(secondary_password)) - # Clean up possible leftover secondary passwords from the LDAP account - if len(user.password) > 1: - for hash in list(user.password): - try: - if not ldap_md5_crypt.verify(password, hash): - user.password.remove(hash) - except ValueError: - # don't remove unknown hashes - pass - # Add a new generated encrypted password to LDAP - user.password.append( - ldap_md5_crypt.encrypt(b64encode(secondary_password))) - user.save() + with get_bound_ldapuser(request, password) as user: + secondary_password = Random.get_random_bytes(48) + request.session['secondary_password'] = cipher.encrypt(secondary_password) + # Clean up possible leftover secondary passwords from the LDAP account + if len(user.password) > 1: + for hash in list(user.password): + try: + if not ldap_md5_crypt.verify(password, hash): + user.password.remove(hash) + except ValueError: + # don't remove unknown hashes + pass + # Add a new generated encrypted password to LDAP + user.password.append(ldap_md5_crypt.encrypt(b64encode(secondary_password))) + user.save() def remove_secondary_password(request): @@ -61,15 +58,15 @@ def remove_secondary_password(request): request.session['secondary_password'], 48)) except KeyError: return - user = get_bound_ldapuser(request, password) - if len(user.password) > 1: - for hash in list(user.password): - try: - if ldap_md5_crypt.verify(password, hash): - user.password.remove(hash) - break - except ValueError: - # ignore unknown hashes - pass - user.save() + with get_bound_ldapuser(request, password) as user: + if len(user.password) > 1: + for hash in list(user.password): + try: + if ldap_md5_crypt.verify(password, hash): + user.password.remove(hash) + break + except ValueError: + # ignore unknown hashes + pass + user.save() diff --git a/okupy/tests/unit/test_ldapuser.py b/okupy/tests/unit/test_ldapuser.py index a160571..410e9f1 100644 --- a/okupy/tests/unit/test_ldapuser.py +++ b/okupy/tests/unit/test_ldapuser.py @@ -44,8 +44,8 @@ class LDAPUserUnitTests(TestCase): request = set_request('/', user=vars.USER_ALICE) request.session['secondary_password'] = cipher.encrypt( secondary_password) - user = get_bound_ldapuser(request) - self.assertEqual(user.username, vars.USER_ALICE.username) + with get_bound_ldapuser(request) as user: + self.assertEqual(user.username, vars.USER_ALICE.username) def test_get_bound_ldapuser_bind_as_is_properly_set_from_request(self): secondary_password = Random.get_random_bytes(48) @@ -56,22 +56,22 @@ class LDAPUserUnitTests(TestCase): request = set_request('/', user=vars.USER_ALICE) request.session['secondary_password'] = cipher.encrypt( secondary_password) - get_bound_ldapuser(request) - db_alias = 'ldap_%s' % request.session.cache_key - self.assertEqual(settings.DATABASES[db_alias]['PASSWORD'], - b64encode(secondary_password)) + with get_bound_ldapuser(request) as user: # noqa + db_alias = 'ldap_%s' % request.session.cache_key + self.assertEqual(settings.DATABASES[db_alias]['PASSWORD'], + b64encode(secondary_password)) def test_get_bound_ldapuser_bind_as_is_properly_set_from_password(self): request = set_request('/', user=vars.USER_ALICE) - get_bound_ldapuser(request, password='ldaptest') - db_alias = 'ldap_%s' % request.session.cache_key - self.assertTrue(ldap_md5_crypt.verify(settings.DATABASES[db_alias][ - 'PASSWORD'], ldap_users('alice')[1]['userPassword'][0])) + with get_bound_ldapuser(request, password='ldaptest') as user: # noqa + db_alias = 'ldap_%s' % request.session.cache_key + self.assertTrue(ldap_md5_crypt.verify(settings.DATABASES[db_alias][ + 'PASSWORD'], ldap_users('alice')[1]['userPassword'][0])) def test_get_bound_ldapuser_password_set(self): request = set_request('/', user=vars.USER_ALICE) - user = get_bound_ldapuser(request, password='ldaptest') - self.assertEqual(user.username, vars.USER_ALICE.username) + with get_bound_ldapuser(request, password='ldaptest') as user: + self.assertEqual(user.username, vars.USER_ALICE.username) def test_get_bound_ldapuser_no_password_available(self): request = set_request('/', user=vars.USER_ALICE) @@ -89,3 +89,17 @@ class LDAPUserUnitTests(TestCase): request = set_request('/', user=vars.USER_ALICE) self.assertRaises(ldap.INVALID_CREDENTIALS, get_bound_ldapuser, request, 'test') + + def test_get_bound_ldapuser_context_manager_cleans_up_settings(self): + secondary_password = Random.get_random_bytes(48) + secondary_password_crypt = ldap_md5_crypt.encrypt(b64encode( + secondary_password)) + self.ldapobject.directory[ldap_users('alice')[0]][ + 'userPassword'].append(secondary_password_crypt) + request = set_request('/', user=vars.USER_ALICE) + request.session['secondary_password'] = cipher.encrypt( + secondary_password) + with get_bound_ldapuser(request) as user: # noqa + pass + db_alias = 'ldap_%s' % request.session.cache_key + self.assertNotIn(db_alias, settings.DATABASES) -- cgit v1.2.3-18-g5258