package de.midlane_illaoi.eventflow; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeSet; import de.midlane_illaoi.eventflow.event.ObservableEvent; import de.midlane_illaoi.eventflow.event.ObserverSortingEvent; public class ObservableEventDispatcher { private Map>, Collection> observersMap; private List> queuedModifications; class QueuedModification{ private Class> eventType; private T observer; private boolean isSubscription; public QueuedModification(Class> eventType, T observer, boolean isSubscription) { super(); this.eventType = eventType; this.observer = observer; this.isSubscription = isSubscription; } public Class> getEventType() { return eventType; } public T getObserver() { return observer; } public boolean isSubscription() { return isSubscription; } } /** * This variable serves as a recursion counter for nested event dispatching. * */ private int isDispatching; public ObservableEventDispatcher() { isDispatching = 0; observersMap = new HashMap<>(); queuedModifications = new LinkedList>(); } /** * Currently this dispacther favors performance over functionality. * It is currently not possible to change active listeners with a listener for the currently dispatched event. * Anyways it is possible to change subscribed listener with a listener. * The changes will become active after the dispatching of the currently dispatched event and dispatching of all events that caused the currently dispatched event stops. * */ public void subscribe(Class> eventType, T observer) { Collection computeIfAbsent; if( isDispatching > 0 ) { queuedModifications.add( new QueuedModification(eventType, observer, true) ); }else { computeIfAbsent = getOrCreateListenerSet(eventType); computeIfAbsent.add(observer); } } /** * Currently this dispacther favors performance over functionality. * It is currently not possible to change active listeners with a listener for the currently dispatched event. * Anyways it is possible to change subscribed listener with a listener. * The changes will become active after the dispatching of the currently dispatched event stops. * * @return true if the listener was currently subscribed and removed. false other wise. * false is also returned in case the dispatcher is currently dispatching and queues the removal. * */ public boolean unsubscribe(Class> eventType, T observer) { Collection computeIfAbsent; if( isDispatching > 0 ) { queuedModifications.add( new QueuedModification(eventType, observer, false) ); return false; }else { computeIfAbsent = getOrCreateListenerSet(eventType); return computeIfAbsent.remove(observer); } } @SuppressWarnings({ "unchecked"}) private Collection getOrCreateListenerSet(Class> eventType) { Collection computeIfAbsent = observersMap.computeIfAbsent(eventType, key -> { if( ObserverSortingEvent.class.isAssignableFrom(eventType) ) { //ObserverSortingEvent eventInstance = getSortableEventTypeInstance((Class) eventType); ObserverSortingEvent eventInstance = getSortableEventTypeInstance(castObservableEventClassToObservableEventClass(eventType)); if( eventInstance.isComparatorUsedForAllEventsOfType() ) { Comparator observerComparator = (Comparator) eventInstance.getObserverComparator(); return new TreeSet( observerComparator ); } } return new ArrayList(); }); return (Collection) computeIfAbsent; } @SuppressWarnings("unchecked") private Class castObservableEventClassToObservableEventClass(Class> eventType){ return (Class) eventType; } private T getSortableEventTypeInstance( Class eventType ) { Constructor constructor = null; Boolean accessibility = null; try { // Get the (private) default constructor constructor = eventType.getDeclaredConstructor(); accessibility = constructor.isAccessible(); // Set the accessibility to true constructor.setAccessible(true); // Create an instance using the private constructor return (T) constructor.newInstance(); } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e) { String errorMessage = String.format("Cannot subscribe to events of type: %s. Events that implement the %s interface must implement a (private) default constructor.", eventType.getName(), ObserverSortingEvent.class.getName()); throw new RuntimeException( errorMessage, e); }finally { if( accessibility != null && constructor != null) { constructor.setAccessible(accessibility); } } } public void dispatchEvent(ObservableEvent event) { isDispatching++; try { List> eventTypes = getAllEventSuperclasses(event.getClass()); for (Class eventType : eventTypes) { if(!observersMap.containsKey(eventType)) { continue; } //List observers = (List) observersMap.get(eventType); /* * Make a copy of the observers list. * * Observers can now unsubscribe while being notified. * * Alternative solution: Queuing Unsubscribe Calls: * * * Pros: * Lower cpu usage * Lower Memory Usage: There's no need to create copies of observer lists, so memory usage remains low. * Deferred Unsubscription: Unsubscribe calls are deferred until after event dispatch, allowing observers to continue receiving events during the current dispatch cycle. * * Cons: * Complexity: Managing a queue of unsubscribe commands adds complexity to the implementation. * Potential Delay: Unsubscribed observers may receive events during the current dispatch cycle. * */ /* * * @SuppressWarnings("unchecked") * List observers = new ArrayList<>( (Set) observersMap.get(eventType) ); */ @SuppressWarnings("unchecked") Collection observers = (Collection) observersMap.get(eventType); /* * currently for a ObserverSortingEvent all listeners * get sorted for every dispatched event. * * This may slow down execution. * An alternative is to keep the observers for such event types * in a sorted list * */ if (event instanceof ObserverSortingEvent) { ObserverSortingEvent sortingEvent = (ObserverSortingEvent) event; if( !sortingEvent.isComparatorUsedForAllEventsOfType() ) { observers = new ArrayList<>( observers ); @SuppressWarnings("unchecked") Comparator observerComparator = (Comparator) sortingEvent.getObserverComparator(); ((ArrayList)observers).sort( observerComparator ); } } observers.forEach(observer -> { event.accept(observer); }); } /* Class> eventType = getEventType(event); @SuppressWarnings("unchecked") List observers = (List) observersMap.computeIfAbsent(eventType, key -> new ArrayList()); observers.forEach(observer -> { event.accept(observer); }); */ }catch (Exception e){ throw e; } finally { isDispatching--; } /* * code changed at 15.07.2024 * old code did not check for * if( isDispatching == 0 ) { * */ if( isDispatching == 0 ) { for( QueuedModification queuedModification : queuedModifications ) { if( queuedModification.isSubscription() ) { subscribeHelper(queuedModification); }else { unsubscribeHelper(queuedModification); } } queuedModifications.clear(); } } private void subscribeHelper(QueuedModification queuedModification) { subscribe(queuedModification.getEventType(), queuedModification.getObserver()); } private void unsubscribeHelper(QueuedModification queuedModification) { unsubscribe(queuedModification.getEventType(), queuedModification.getObserver()); } private List> getAllEventSuperclasses(Class eventType) { List> superclasses = new ArrayList<>(); while (eventType != null && ObservableEvent.class.isAssignableFrom(eventType)) { superclasses.add(eventType); eventType = eventType.getSuperclass(); } return superclasses; } @SuppressWarnings({ "unchecked", "unused" }) private Class> getEventType(ObservableEvent event) { @SuppressWarnings("rawtypes") Class eventClass = event.getClass(); return (Class>) eventClass; } }