Commit 4d69eb12 authored by Ilham Maulana's avatar Ilham Maulana 💻

fix: shit auth and book endpoint to not using drf

parent 6ea2daa0
import jwt
import json
import random
from django.utils import timezone
from django.contrib.auth import authenticate, login, logout
from django.core.mail import send_mail
from rest_framework import views, viewsets, status
from rest_framework.response import Response
from rest_framework.filters import SearchFilter
from rest_framework.authtoken.models import Token
from users.models import ResetPasswordPin
from .serializers import (
User,
Librarian,
LibrarianSerializer,
LibrarianLoginHistory,
LoginHistorySerializer,
Member,
MemberSerializer,
User,
UserSerializer,
UpdateProfileSerializer,
)
from .permissions import IsStaffUser, IsNotStaffUser
class LibrarianViewSet(viewsets.ModelViewSet):
permission_classes = [IsStaffUser]
queryset = Librarian.objects.all().order_by("created_at")
serializer_class = LibrarianSerializer
filter_backends = [SearchFilter]
search_fields = [
"user__username",
"user__email",
"user__first_name",
"user__last_name",
]
def update(self, request, pk):
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(serializer.data, status=status.HTTP_200_OK)
class LibrarianLoginHistoryViewSet(viewsets.ModelViewSet):
permission_classes = [IsStaffUser]
queryset = LibrarianLoginHistory.objects.all().order_by("date")
serializer_class = LoginHistorySerializer
filter_backends = [SearchFilter]
search_fields = ["librarian__name"]
class MemberViewSet(viewsets.ModelViewSet):
permission_classes = [IsNotStaffUser]
queryset = Member.objects.all().order_by("created_at")
serializer_class = MemberSerializer
filter_backends = [SearchFilter]
search_fields = [
"user__username",
"user__email",
"user__first_name",
"user__last_name",
]
def update(self, request, pk):
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(serializer.data, status=status.HTTP_200_OK)
class UserDetailView(views.APIView):
def get(self, request):
header = request.headers.get("Authorization")
if header is None:
return Response(
{"message": "Unauthorized"}, status=status.HTTP_401_UNAUTHORIZED
)
token = header.split(" ")[1]
verified_token = Token.objects.filter(key=token)
if not verified_token.exists():
return Response(
{"message": "Token is invalid or expired"},
status=status.HTTP_401_UNAUTHORIZED,
)
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.contrib.auth import logout, authenticate
user_id = verified_token[0].user.id
user = User.objects.get(pk=user_id)
from users.models import User, Member, ResetPasswordPin
from loans.models import Book, BookLoan
account_id = None
if user.is_staff:
account_id = user.librarian.id
else:
account_id = user.member.id
data = {
"id": user.pk,
"username": user.username,
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name,
"is_staff": user.is_staff,
"account_id": account_id,
}
@csrf_exempt
def loginUserView(request):
return Response(data, status=status.HTTP_200_OK)
if request.method == "POST":
data = json.loads(request.body)
username = data.get("username")
password = data.get("password")
class LoginBaseView(views.APIView):
def post(self, request):
user = authenticate(
username=request.data["username"], password=request.data["password"]
user = authenticate(username=username, password=password)
if user is None:
return JsonResponse(
{"message": "Username or Password is incorrect"}, status=400
)
if user:
login(request=request, user=user)
token, created = Token.objects.get_or_create(user=user)
return Response({"token": token.key})
else:
return Response({"error": "Invalid credentials"}, status=401)
class LibrarianLoginView(LoginBaseView):
def post(self, request, *args, **kwargs):
response = super().post(request, *args, **kwargs)
if response.status_code == 200:
if not request.user.is_staff:
return Response(
{"message": "Account does not have access"},
status=status.HTTP_403_FORBIDDEN,
expired = timezone.now() + timezone.timedelta(days=1)
payload = {"user_id": user.pk, "exp": expired}
token = jwt.encode(payload, key="secret", algorithm="HS256")
return JsonResponse(
{"message": "Login successful", "token": token},
status=201,
)
else:
pass
return response
@csrf_exempt
def registerUserView(request):
users = User.objects.all()
class RegisterBaseView(views.APIView):
serializer_class = None
def post(self, request):
data = request.data
serializer = self.serializer_class(data=data)
serializer.is_valid(raise_exception=True)
serializer.save()
if request.method == "POST":
data = json.loads(request.body)
username = data.get("username")
email = data.get("email")
password = data.get("password")
user = User.objects.get(id=serializer.data["id"])
login(request=request, user=user)
token, created = Token.objects.get_or_create(user=user)
is_username = users.filter(username=username)
is_email = users.filter(email=email)
response = serializer.data.copy()
response["token"] = token.key
return Response(response, status=status.HTTP_200_OK)
if is_username.exists() or is_email.exists():
return JsonResponse(
{"message": "Username or email already exists"}, status=400
)
user = User.objects.create_user(
username=username, email=email, password=password
)
expired = timezone.now() + timezone.timedelta(days=1)
payload = {"user_id": user.pk, "exp": expired}
token = jwt.encode(payload, key="secret", algorithm="HS256")
data = {
"message": "register successful",
"token": token,
}
return JsonResponse(data, status=201, safe=False)
class LibrarianRegisterView(RegisterBaseView):
serializer_class = UserSerializer
@csrf_exempt
def logoutUserView(request):
class MemberRegisterView(RegisterBaseView):
serializer_class = UserSerializer
if request.method == "GET":
logout(request)
return JsonResponse({"message": "Logout successful"}, status=200)
def post(self, request):
response = super().post(request)
user_id = response.data.get("id")
user = User.objects.get(pk=user_id)
Member.objects.create(user=user)
return JsonResponse({"message": "Invalid request method"}, status=405)
return response
@csrf_exempt
def getUserDetail(request):
if request.method == "GET":
header_authorization = request.headers.get("Authorization")
try:
token = header_authorization.split(" ")[1]
payload = jwt.decode(token, key="secret", algorithms=["HS256"])
user = User.objects.get(pk=payload["user_id"])
data = {
"id": user.id,
"username": user.username,
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name,
"is_staff": user.is_staff,
}
return JsonResponse(data, status=200, safe=False)
except jwt.exceptions.InvalidTokenError:
return JsonResponse({"message": "Unauthorized"}, status=401)
else:
return JsonResponse({"message": "Invalid request method"}, status=405)
class MemberLoginView(LoginBaseView):
def post(self, request, *args, **kwargs):
response = super().post(request, *args, **kwargs)
@csrf_exempt
def updateUserProfileView(request):
users = User.objects.all()
if response.status_code == 200:
if request.user.is_staff:
return Response(
{"message": "Account does not have access"},
status=status.HTTP_403_FORBIDDEN,
if request.method == "PUT":
data = json.loads(request.body)
header_authorization = request.headers.get("Authorization")
try:
token = header_authorization.split(" ")[1]
payload = jwt.decode(token, key="secret", algorithms=["HS256"])
user = User.objects.get(pk=payload["user_id"])
if data.get("username") != user.username:
is_username = users.filter(username=data.get("username"))
if is_username.exists():
return JsonResponse(
{"message": "Username already exists"}, status=400
)
else:
pass
return response
if data.get("email") != user.email:
is_email = users.filter(email=data.get("email"))
if is_email.exists():
return JsonResponse({"message": "Email already exists"}, status=400)
class LogoutView(views.APIView):
user.username = data.get("username", user.username)
user.first_name = data.get("first_name", user.first_name)
user.last_name = data.get("last_name", user.last_name)
user.email = data.get("email", user.email)
user.save()
def get(self, request):
header = request.headers.get("Authorization")
if header is None:
return Response(
{"message": "Unauthorized"}, status=status.HTTP_401_UNAUTHORIZED
return JsonResponse(
{"message": "User profile updated successfully"}, status=200
)
token = header.split(" ")[1]
verified_token = Token.objects.filter(key=token)
if not verified_token.exists():
return Response(
{"message": "Token is invalid or expired"},
status=status.HTTP_401_UNAUTHORIZED,
)
except jwt.exceptions.InvalidTokenError:
return JsonResponse({"message": "Unauthorized"}, status=401)
verified_token.delete()
logout(request=request)
return JsonResponse({"message": "Invalid request method"}, status=405)
return Response({"message": "Logout success"}, status=status.HTTP_200_OK)
@csrf_exempt
def changePasswordView(request):
if request.method == "PUT":
data = json.loads(request.body)
header_authorization = request.headers.get("Authorization")
class MemberChangePasswordView(views.APIView):
permission_classes = [IsNotStaffUser]
try:
token = header_authorization.split(" ")[1]
payload = jwt.decode(token, key="secret", algorithms=["HS256"])
user = User.objects.get(pk=payload["user_id"])
old_password = data.get("old_password")
new_password1 = data.get("new_password1")
new_password2 = data.get("new_password2")
if not user.check_password(old_password):
return JsonResponse(
{"message": "Invalid old password"},
status=400,
)
def post(self, request, member_id):
new_password = request.data.get("new_password")
old_password = request.data.get("old_password")
member = Member.objects.get(pk=member_id)
user = member.user
if new_password1 != new_password2:
return JsonResponse(
{"message": "Passwords and confirm password do not match"},
status=400,
)
if user.check_password(old_password):
user.set_password(new_password)
user.set_password(str(new_password1))
user.save()
return Response(
{"message": "Pasword succesfuly changed"}, status=status.HTTP_200_OK
)
return Response(
{"message": "Change password failed, old password is invalid."},
status=status.HTTP_403_FORBIDDEN,
)
return JsonResponse({"message": "Change password successful"}, status=200)
except jwt.exceptions.InvalidTokenError:
return JsonResponse({"message": "Unauthorized"}, status=401)
return JsonResponse({"message": "Invalid request method"}, status=405)
class TokenResetPasswordView(views.APIView):
def generate_random_pin(self):
def generate_random_pin():
return random.randint(10000000, 99999999)
def store_data_with_pin(self, user):
pin = self.generate_random_pin()
ResetPasswordPin.objects.get_or_create(pin=pin, user=user)
def store_data_with_pin(user):
is_pin = ResetPasswordPin.objects.filter(user=user).first()
if is_pin is None:
pin = generate_random_pin()
ResetPasswordPin.objects.create(pin=generate_random_pin(), user=user)
else:
pin = is_pin.pin
return pin
def post(self, request):
data = request.data.copy()
@csrf_exempt
def resetPasswordView(request):
users = User.objects.all()
if request.method == "POST":
data = json.loads(request.body)
email = data.get("email")
user = User.objects.get(email=email)
if user is None:
return Response(
{"message": "Invalid Email, Request pin reset password failed"},
status=status.HTTP_403_FORBIDDEN,
)
try:
user = users.get(email=email)
pin = store_data_with_pin(user)
pin = self.store_data_with_pin(user)
message = f"Here's your reset password pin: {pin}"
send_mail(
user.email_user(
subject="Django Library App Reset password pin, dev: Ilham Maulana",
message=message,
from_email="from@example.com",
recipient_list=["to@example.com"],
fail_silently=False,
)
return JsonResponse(
{"message": "Pin reset password sent successfully to your email"},
status=200,
)
except User.DoesNotExist:
return JsonResponse(
{"message": "User with this email does not exist"}, status=400
)
return JsonResponse({"message": "Invalid request method"}, status=405)
@csrf_exempt
def resetPasswordConfirmView(request):
if request.method == "POST":
data = json.loads(request.body)
password1 = data.get("password1")
password2 = data.get("password2")
pin = data.get("pin")
data["message"] = (
"Your pin request was successful! We've sent an email with instructions on how to use it."
if password1 is None or password2 is None:
return JsonResponse(
{"message": "Password and confirm password are required"}, status=400
)
return Response(data, status=status.HTTP_200_OK)
if password1 != password2:
return JsonResponse(
{"message": "Password and confirm password do not match"}, status=400
)
if pin is None:
return JsonResponse({"message": "Pin is required"}, status=400)
class ResetPasswordConfirmView(views.APIView):
pin = ResetPasswordPin.objects.filter(pin=pin).first()
if pin is None:
return JsonResponse({"message": "Pin is invalid"}, status=400)
def post(self, request):
data = request.data
if password1 != password2:
return JsonResponse(
{"message": "Passwords and confirm password do not match"},
status=400,
)
pin = data.get("pin")
password1 = data.get("password1")
password2 = data.get("password2")
encoded = None
pin.user.set_password(password1)
pin.delete()
return JsonResponse({"message": "Password reset successful"}, status=200)
return JsonResponse({"message": "Invalid request method"}, status=405)
is_password_invalid = password1 != password2
if is_password_invalid:
return Response(
{"message": "password and confirm password are not same"},
status=status.HTTP_400_BAD_REQUEST,
@csrf_exempt
def checkAuthSessionView(request):
if request.method == "GET":
if request.user.is_authenticated:
return JsonResponse(
{"message": "User is authenticated", "authenticated": True}, status=200
)
else:
return JsonResponse(
{"message": "User is not authenticated", "authenticated": False},
status=401,
)
else:
return JsonResponse({"message": "Invalid request method"}, status=405)
@csrf_exempt
def memberLoanView(request):
header_authorization = request.headers.get("Authorization")
book_loans = BookLoan.objects.all()
if request.method == "GET":
now = timezone.now()
due_date_treshold = now + timezone.timedelta(days=3)
near_outstanding = request.GET.get("near_outstanding")
overdue = request.GET.get("overdue")
try:
encoded = ResetPasswordPin.objects.get(pin=pin)
except ResetPasswordPin.DoesNotExist:
return Response(
{"message": "Invalid pin reset password"},
status=status.HTTP_401_UNAUTHORIZED,
token = header_authorization.split(" ")[1]
payload = jwt.decode(token, key="secret", algorithms=["HS256"])
user = User.objects.get(pk=payload["user_id"])
member = Member.objects.filter(user=user).first()
if member is None:
return JsonResponse({"message": "Member not found"}, status=404)
is_loans = book_loans.filter(member=member).first()
if is_loans is None:
return JsonResponse({"message": "No loans found"}, status=404)
loans = book_loans.filter(member=member)
if near_outstanding:
loans = (
loans.filter(due_date__lte=due_date_treshold, return_date=None)
.filter(due_date__gte=now)
.order_by("loan_date")
)
encoded.user.set_password(password1)
encoded.user.save()
encoded.delete()
return Response(
{"message": "Reset password success"},
status=status.HTTP_200_OK,
if overdue:
loans = loans.filter(due_date__lte=now, return_date=None).order_by(
"loan_date"
)
data = []
for loan in loans:
remaining_loan_time = str(loan.due_date.day - now.day) + " days left"
is_overdue = loan.due_date < now
loan_obj = {
"book": {
"id": loan.book.id,
"title": loan.book.title,
"author": loan.book.author,
"description": loan.book.description,
"cover_image": loan.book.cover_image.url,
},
"remaining_loan_time": remaining_loan_time,
"is_overdue": is_overdue,
"loan_date": loan.loan_date,
"due_date": loan.due_date,
}
data.append(loan_obj)
class UpdateProfileView(viewsets.ModelViewSet):
serializer_class = UpdateProfileSerializer
queryset = User.objects.all().order_by("id")
book_loans.filter(due_date__lte=now, return_date=None).order_by("loan_date")
def update(self, request, pk):
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
return JsonResponse(data, safe=False, status=200)
user_id = serializer.data.get("id")
user = User.objects.get(pk=user_id)
except jwt.exceptions.InvalidTokenError:
return JsonResponse({"message": "Unauthorized"}, status=401)
account_id = None
if user.is_staff:
account_id = user.librarian.id
else:
account_id = user.member.id
if request.method == "POST":
data = json.loads(request.body)
book_id = data.get("book")
member_id = data.get("member")
loan_date = data.get("loan_date")
due_date = data.get("due_date")
try:
token = header_authorization.split(" ")[1]
jwt.decode(token, key="secret", algorithms=["HS256"])
member = Member.objects.filter(user__pk=member_id).first()
book = Book.objects.filter(pk=book_id).first()
if member is None:
return JsonResponse({"message": "Member not found"}, status=404)
if book is None:
return JsonResponse({"message": "Book not found"}, status=404)
book_loans.create(
member=member, book=book, loan_date=loan_date, due_date=due_date
)
return JsonResponse(
{"message": "create loan successfull"}, safe=False, status=200
)
except jwt.exceptions.InvalidTokenError:
return JsonResponse({"message": "Unauthorized"}, status=401)
response = serializer.data
response["account_id"] = account_id
return Response(response, status=status.HTTP_200_OK)
return JsonResponse({"message": "Invalid request method"}, status=405)
import json
from django.http import JsonResponse
from django.core.serializers import serialize
from django.views.decorators.csrf import csrf_exempt
from rest_framework import viewsets
from rest_framework.response import Response
from rest_framework.filters import SearchFilter
from django_filters.rest_framework import DjangoFilterBackend
from .serializers import Book, BookSerializer, Category, CategorySerializer
from book.models import Book, Category
from .serializers import BookSerializer, CategorySerializer
@csrf_exempt
def bookView(request):
books = Book.objects.all().order_by("created_at")
category = request.GET.get("category")
keyword = request.GET.get("search")
if request.method == "GET":
if category:
books = books.filter(category__name=category)
if keyword and len(keyword) >= 3:
books = books.filter(title__icontains=keyword)
data = []
for book_item in books:
if book_item.category is not None:
book = {
"id": book_item.id,
"title": book_item.title,
"author": book_item.author,
"description": book_item.description,
"cover_image": "http://127.0.0.1:8000" + book_item.cover_image.url,
"category": {
"name": book_item.category.name,
},
}
book = {
"id": book_item.id,
"title": book_item.title,
"author": book_item.author,
"description": book_item.description,
"cover_image": "http://127.0.0.1:8000" + book_item.cover_image.url,
}
data.append(book)
return JsonResponse(data, status=200, safe=False)
return JsonResponse({"message": "Invalid request method"}, status=405)
class BookViewSet(viewsets.ModelViewSet):
......
......@@ -2,22 +2,19 @@ from django.urls import path, include
from rest_framework import routers
from .auth.views import (
LibrarianViewSet,
LibrarianLoginView,
LibrarianRegisterView,
LibrarianLoginHistoryViewSet,
MemberViewSet,
MemberLoginView,
MemberRegisterView,
MemberChangePasswordView,
LogoutView,
TokenResetPasswordView,
ResetPasswordConfirmView,
UserDetailView,
UpdateProfileView,
LoginBaseView,
registerUserView,
loginUserView,
logoutUserView,
getUserDetail,
memberLoanView,
updateUserProfileView,
checkAuthSessionView,
changePasswordView,
resetPasswordView,
resetPasswordConfirmView,
)
from .book.views import BookViewSet, CategoryViewSet
from .book.views import bookView, CategoryViewSet
from .loans.views import (
BookLoanViewSet,
OverduedBookLoanViewSet,
......@@ -27,10 +24,6 @@ from .loans.views import (
router = routers.DefaultRouter()
router.register(r"user", UpdateProfileView, basename="user")
router.register(r"librarians", LibrarianViewSet, basename="librarians")
router.register(r"members", MemberViewSet, basename="members")
router.register(r"books", BookViewSet, basename="books")
router.register(r"categories", CategoryViewSet, basename="categories")
router.register(r"book-loans", BookLoanViewSet, basename="book_loans")
router.register(
......@@ -39,9 +32,6 @@ router.register(
router.register(
r"upcoming-loans", UpComingBookLoanViewSet, basename="book_loans_upcoming"
)
router.register(
r"login-history", LibrarianLoginHistoryViewSet, basename="librarian_login_history"
)
router_member_loan = routers.DefaultRouter()
router_member_loan.register(r"loans", MemberLoanViewSet, basename="member_loans")
......@@ -49,49 +39,20 @@ router_member_loan.register(r"loans", MemberLoanViewSet, basename="member_loans"
urlpatterns = [
path("", include(router.urls)),
# auth
path(
"user",
UserDetailView.as_view(),
name="user_detail",
),
path(
"user",
UserDetailView.as_view(),
name="user_detail",
),
path(
"reset-password/request-token",
TokenResetPasswordView.as_view(),
name="reset_password_request_token",
),
path(
"reset-password/confirm",
ResetPasswordConfirmView.as_view(),
path("user", getUserDetail, name="user_detail"),
path("user/loans", memberLoanView, name="user_loans"),
path("user/update", updateUserProfileView, name="update_user_profile"),
path("auth/login", loginUserView, name="login"),
path("auth/logout", logoutUserView, name="logout"),
path("auth/register", registerUserView, name="register"),
path("auth/change-password", changePasswordView, name="change_password"),
path("auth/reset-password", resetPasswordView, name="reset_password"),
path(
"auth/reset-password-confirm",
resetPasswordConfirmView,
name="reset_password_confirm",
),
path("librarians/auth/login", LibrarianLoginView.as_view(), name="librarian_login"),
path("auth/login", LoginBaseView.as_view(), name="universal_login"),
path(
"librarians/auth/register",
LibrarianRegisterView.as_view(),
name="librarian_register",
),
path("auth/logout", LogoutView.as_view(), name="librarian_logout"),
path("members/auth/login", MemberLoginView.as_view(), name="member_login"),
path(
"members/auth/register",
MemberRegisterView.as_view(),
name="librarian_register",
),
# change password
path(
"members/<int:member_id>/change-password",
MemberChangePasswordView.as_view(),
name="member_change_password",
),
path(
"members/<int:member_id>/",
include(router_member_loan.urls),
name="member_loans",
),
path("auth/check-auth-session", checkAuthSessionView, name="check_auth_session"),
# books
path("books", bookView, name="books"),
]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment