diff --git a/keepercommander/commands/nested_share_folder/record_commands.py b/keepercommander/commands/nested_share_folder/record_commands.py index bbcdfe005..31663ac3b 100644 --- a/keepercommander/commands/nested_share_folder/record_commands.py +++ b/keepercommander/commands/nested_share_folder/record_commands.py @@ -16,11 +16,13 @@ (create, update, link/unlink, shortcut management, delete). """ +import json import logging -from typing import List +from typing import Any, Dict, List, Optional from ..base import Command, GroupCommand from ..record_edit import RecordEditMixin, record_fields_description, ParsedFieldValue +from ...enforcement import PasswordComplexityEnforcer, RecordTypeEnforcer from ...error import CommandError from ... import nested_share_folder as _nsf, vault from .helpers import ( @@ -48,6 +50,7 @@ class NestedShareRecordAddCommand(Command, RecordEditMixin): def __init__(self): super().__init__() + RecordEditMixin.__init__(self) def get_parser(self): return nested_share_record_add_parser @@ -64,12 +67,17 @@ def execute(self, params, **kwargs): if not record_type: raise CommandError('nsf-record-add', 'Record type parameter is required.') + RecordTypeEnforcer.enforce(params, record_type, 'nsf-record-add') + + self.warnings.clear() + self._password_policy = PasswordComplexityEnforcer.get_policy(params) + notes = kwargs.get('notes') record_fields, add_attachments = self._parse_fields(kwargs.get('fields', [])) folder_uid = self._resolve_folder(params, kwargs.get('folder_uid')) - self.warnings.clear() data = self._build_record_data(params, record_type, title, notes, record_fields) + self._check_password_policy(params, data, **kwargs) if self.warnings: for w in self.warnings: @@ -165,6 +173,13 @@ def _legacy_to_data(record, title, notes=None): data['notes'] = notes return data + def _check_password_policy(self, params, data, **kwargs): + pw_failures = PasswordComplexityEnforcer.validate_record(params, data) + for failure in pw_failures: + self.on_warning(failure) + if pw_failures and not kwargs.get('force'): + self.on_warning('Use --force to bypass password policy warnings.') + # ══════════════════════════════════════════════════════════════════════════ # nsf-record-update @@ -175,6 +190,7 @@ class NestedShareRecordUpdateCommand(Command, RecordEditMixin): def __init__(self): super().__init__() + RecordEditMixin.__init__(self) def get_parser(self): return nested_share_record_update_parser @@ -190,7 +206,7 @@ def _resolve_field_value(self, parsed): action_params.clear() if self.is_generate_value(raw, action_params): if parsed.type == 'password': - return self.generate_password(action_params) + return self.generate_password(action_params, policy=self._password_policy) if parsed.type in ('oneTimeCode', 'otp'): return self.generate_totp_url() return raw @@ -208,8 +224,12 @@ def execute(self, params, **kwargs): if not record_uids: raise CommandError('nsf-record-update', 'Record UID is required (use -r or --record)') + self.warnings.clear() + self._password_policy = PasswordComplexityEnforcer.get_policy(params) + record_type = kwargs.get('record_type') if record_type and record_type not in ('legacy', 'general'): + RecordTypeEnforcer.enforce(params, record_type, 'nsf-record-update') rt_fields = self.get_record_type_fields(params, record_type) if not rt_fields: raise CommandError('nsf-record-update', f'Record type "{record_type}" cannot be found.') @@ -245,6 +265,20 @@ def execute(self, params, **kwargs): ensure_nested_share_record(params, record_uid, 'nsf-record-update', identifier=identifier) check_record_edit_permission(params, record_uid, 'nsf-record-update') + merged = self._merge_update_data( + params, record_uid, + title=kwargs.get('title'), + record_type=record_type, + fields=fields or None, + notes=kwargs.get('notes'), + ) + self._check_password_policy(params, merged, **kwargs) + if self.warnings: + for w in self.warnings: + logging.warning(w) + if not kwargs.get('force'): + return + self.warnings.clear() result = _nsf.update_record_v3( params=params, record_uid=record_uid, title=kwargs.get('title'), record_type=record_type, @@ -253,6 +287,53 @@ def execute(self, params, **kwargs): check_result(result, 'nsf-record-update') params.sync_data = True + def _check_password_policy(self, params, data, **kwargs): + pw_failures = PasswordComplexityEnforcer.validate_record(params, data) + for failure in pw_failures: + self.on_warning(failure) + if pw_failures and not kwargs.get('force'): + self.on_warning('Use --force to bypass password policy warnings.') + + @staticmethod + def _load_record_data(params, record_uid): # type: (Any, str) -> Optional[Dict] + rec = params.record_cache.get(record_uid) or {} + raw = rec.get('data_unencrypted') + if raw is None: + nsf_data = getattr(params, 'nested_share_record_data', {}).get(record_uid) or {} + raw = nsf_data.get('data_json') + if raw is None: + return None + if isinstance(raw, bytes): + return json.loads(raw.decode('utf-8')) + if isinstance(raw, str): + return json.loads(raw) + if isinstance(raw, dict): + return raw.copy() + return None + + @classmethod + def _merge_update_data(cls, params, record_uid, title=None, record_type=None, + fields=None, notes=None): # type: (...) -> Dict + existing = cls._load_record_data(params, record_uid) + data = existing.copy() if existing else {'fields': []} + if title is not None: + data['title'] = title + if record_type is not None: + data['type'] = record_type + if fields is not None: + by_type = {} + for ef in data.get('fields', []): + by_type.setdefault(ef.get('type'), []).append(ef) + for ft, fv in fields.items(): + fv = fv if isinstance(fv, list) else [fv] + if ft in by_type and by_type[ft]: + by_type[ft][0]['value'] = fv + else: + data.setdefault('fields', []).append({'type': ft, 'value': fv}) + if notes is not None: + data['notes'] = notes + return data + # ══════════════════════════════════════════════════════════════════════════ # nsf-ln diff --git a/keepercommander/commands/record_edit.py b/keepercommander/commands/record_edit.py index 9781bf431..10b62b5f9 100644 --- a/keepercommander/commands/record_edit.py +++ b/keepercommander/commands/record_edit.py @@ -292,7 +292,7 @@ def assign_legacy_fields(self, record, fields): elif parsed_field.type == 'password': action_params.clear() if self.is_generate_value(parsed_field.value, action_params): - record.password = self.generate_password(action_params) + record.password = self.generate_password(action_params, policy=self._password_policy) elif self.is_base64_value(parsed_field.value, action_params): if action_params: record.password = action_params[0] diff --git a/keepercommander/enforcement.py b/keepercommander/enforcement.py index 6d7e802ca..4609568f9 100644 --- a/keepercommander/enforcement.py +++ b/keepercommander/enforcement.py @@ -18,7 +18,7 @@ from typing import Tuple, Optional, List, Dict, Any, Set from . import api, utils, crypto -from .proto import APIRequest_pb2 +from .proto import APIRequest_pb2, record_pb2 from .display import bcolors from .params import KeeperParams from .error import KeeperApiError, CommandError @@ -528,18 +528,30 @@ def get_restricted_record_types(cls, params): # type: (KeeperParams) -> Option cache = getattr(params, 'record_type_cache', None) or {} restricted = set() # type: Set[str] - for rt_id in list(policy.get('std') or []) + list(policy.get('ent') or []): - entry = cache.get(rt_id) - if not entry: - continue - try: - schema = json.loads(entry) if isinstance(entry, str) else entry - except (json.JSONDecodeError, TypeError): - continue - if isinstance(schema, dict): - name = schema.get('$id') - if name: - restricted.add(name) + scope_buckets = ( + ('std', record_pb2.RT_STANDARD), + ('ent', record_pb2.RT_ENTERPRISE), + ) + for bucket, scope in scope_buckets: + for rt_id in policy.get(bucket) or []: + try: + rt_id = int(rt_id) + except (TypeError, ValueError): + continue + # Role policy stores bare recordTypeId; sync_down keys cache by + # recordTypeId + scope * 1_000_000. + scoped_id = rt_id + scope * 1_000_000 + entry = cache.get(scoped_id) or cache.get(rt_id) + if not entry: + continue + try: + schema = json.loads(entry) if isinstance(entry, str) else entry + except (json.JSONDecodeError, TypeError): + continue + if isinstance(schema, dict): + name = schema.get('$id') + if name: + restricted.add(name) return restricted @classmethod diff --git a/unit-tests/test_nested_share_folder.py b/unit-tests/test_nested_share_folder.py index 94ca1b549..c5bd7b591 100644 --- a/unit-tests/test_nested_share_folder.py +++ b/unit-tests/test_nested_share_folder.py @@ -6,6 +6,7 @@ - Key utility/parsing functions """ +import json import os import time from unittest import TestCase, mock @@ -283,6 +284,160 @@ def test_add_record(self, mock_create): title='New Record', folder_uid=fuid, force=True, record_type='general', fields=[]) + @patch('keepercommander.commands.nested_share_folder.record_commands._nsf.create_record_v3') + def test_add_record_rejects_restricted_record_type(self, mock_create): + from keepercommander.commands.nested_share_folder import NestedShareRecordAddCommand + fuid, fobj = _make_folder() + params = _make_params( + nested_share_folders={fuid: fobj}, + record_type_cache={1: json.dumps({'$id': 'login'})}, + enforcements={ + 'jsons': [{'key': 'restrict_record_types', 'value': '{"std": [1], "ent": []}'}], + }, + ) + cmd = NestedShareRecordAddCommand() + with self.assertRaises(CommandError) as ctx: + cmd.execute(params, title='Blocked', record_type='login', fields=[], force=True) + self.assertIn('restricted', str(ctx.exception).lower()) + mock_create.assert_not_called() + + @patch('keepercommander.commands.nested_share_folder.record_commands._nsf.create_record_v3') + def test_add_record_rejects_weak_password_without_force(self, mock_create): + from keepercommander.commands.nested_share_folder import NestedShareRecordAddCommand + fuid, fobj = _make_folder() + params = _make_params( + nested_share_folders={fuid: fobj}, + enforcements={ + 'jsons': [{ + 'key': 'generated_password_complexity', + 'value': json.dumps([{ + 'length': 12, + 'lower-use': True, 'lower-min': 1, + 'upper-use': True, 'upper-min': 1, + 'digit-use': True, 'digit-min': 1, + }]), + }], + }, + ) + cmd = NestedShareRecordAddCommand() + cmd.execute(params, title='Weak', record_type='general', + fields=['password=abc'], force=False) + mock_create.assert_not_called() + + @patch('keepercommander.commands.nested_share_folder.record_commands._nsf.create_record_v3') + def test_add_record_allows_weak_password_with_force(self, mock_create): + from keepercommander.commands.nested_share_folder import NestedShareRecordAddCommand + mock_create.return_value = { + 'record_uid': utils.generate_uid(), 'status': 'SUCCESS', + 'message': '', 'success': True, 'revision': 1, + } + fuid, fobj = _make_folder() + params = _make_params( + nested_share_folders={fuid: fobj}, + enforcements={ + 'jsons': [{ + 'key': 'generated_password_complexity', + 'value': json.dumps([{ + 'length': 12, + 'lower-use': True, 'lower-min': 1, + 'upper-use': True, 'upper-min': 1, + 'digit-use': True, 'digit-min': 1, + }]), + }], + }, + ) + cmd = NestedShareRecordAddCommand() + cmd.execute(params, title='Weak', record_type='general', + fields=['password=abc'], force=True) + mock_create.assert_called_once() + + @patch('keepercommander.commands.nested_share_folder.record_commands._nsf.create_record_v3') + def test_add_record_gen_uses_password_policy(self, mock_create): + from keepercommander.commands.nested_share_folder import NestedShareRecordAddCommand + mock_create.return_value = { + 'record_uid': utils.generate_uid(), 'status': 'SUCCESS', + 'message': '', 'success': True, 'revision': 1, + } + fuid, fobj = _make_folder() + params = _make_params( + nested_share_folders={fuid: fobj}, + enforcements={ + 'jsons': [{ + 'key': 'generated_password_complexity', + 'value': json.dumps([{ + 'length': 16, + 'lower-use': True, 'lower-min': 2, + 'upper-use': True, 'upper-min': 2, + 'digit-use': True, 'digit-min': 2, + 'special-use': True, 'special-min': 1, + 'special': '!@#$', + }]), + }], + }, + ) + cmd = NestedShareRecordAddCommand() + cmd.execute(params, title='Generated', record_type='general', + fields=['password=$GEN'], force=False) + mock_create.assert_called_once() + record_data = mock_create.call_args.kwargs['record_data'] + password = next( + v[0] for f in record_data['fields'] + if f.get('type') == 'password' for v in [f.get('value', [])] if v + ) + self.assertGreaterEqual(len(password), 16) + self.assertGreaterEqual(sum(1 for c in password if c.islower()), 2) + self.assertGreaterEqual(sum(1 for c in password if c.isupper()), 2) + self.assertGreaterEqual(sum(1 for c in password if c.isdigit()), 2) + self.assertGreaterEqual(sum(1 for c in password if c in '!@#$'), 1) + + @patch('keepercommander.commands.nested_share_folder.record_commands._nsf.update_record_v3') + @patch('keepercommander.commands.nested_share_folder.helpers.check_record_edit_permission') + def test_update_record_rejects_restricted_record_type(self, mock_perm, mock_update): + from keepercommander.commands.nested_share_folder import NestedShareRecordUpdateCommand + ruid, robj = _make_record() + params = _make_params( + nested_share_records={ruid: robj}, + record_cache={ruid: {'revision': 1, 'data_unencrypted': json.dumps({ + 'type': 'login', 'title': 'Old', 'fields': [], + })}}, + record_type_cache={1: json.dumps({'$id': 'login'})}, + enforcements={ + 'jsons': [{'key': 'restrict_record_types', 'value': '{"std": [1], "ent": []}'}], + }, + ) + cmd = NestedShareRecordUpdateCommand() + with self.assertRaises(CommandError) as ctx: + cmd.execute(params, record_uids=[ruid], record_type='login', fields=[]) + self.assertIn('restricted', str(ctx.exception).lower()) + mock_update.assert_not_called() + + @patch('keepercommander.commands.nested_share_folder.record_commands._nsf.update_record_v3') + @patch('keepercommander.commands.nested_share_folder.helpers.check_record_edit_permission') + def test_update_record_rejects_weak_password_without_force(self, mock_perm, mock_update): + from keepercommander.commands.nested_share_folder import NestedShareRecordUpdateCommand + ruid, robj = _make_record() + params = _make_params( + nested_share_records={ruid: robj}, + record_cache={ruid: {'revision': 1, 'data_unencrypted': json.dumps({ + 'type': 'login', 'title': 'Old', + 'fields': [{'type': 'password', 'value': ['ExistingPass123']}], + })}}, + enforcements={ + 'jsons': [{ + 'key': 'generated_password_complexity', + 'value': json.dumps([{ + 'length': 12, + 'lower-use': True, 'lower-min': 1, + 'upper-use': True, 'upper-min': 1, + 'digit-use': True, 'digit-min': 1, + }]), + }], + }, + ) + cmd = NestedShareRecordUpdateCommand() + cmd.execute(params, record_uids=[ruid], fields=['password=abc'], force=False) + mock_update.assert_not_called() + @patch('keepercommander.nested_share_folder.folder_record_api.add_record_to_folder_v3') def test_add_record_to_folder(self, mock_add): pass