Unverified Commit d47ff961 authored by Birte Kristina Friesel's avatar Birte Kristina Friesel
Browse files

worker: add EFA support

parent 3a955c01
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -939,6 +939,7 @@ sub startup {
							db         => $db,
							journey    => $journey,
							stop       => $found,
							trip_id    => $trip_id,
							backend_id => $self->stations->get_backend_id(
								efa => $opt{efa}
							),
+79 −0
Original line number Diff line number Diff line
@@ -185,6 +185,85 @@ sub run {
			next;
		}

		if ( $entry->{is_efa} ) {
			eval {
				$self->app->efa->get_journey_p(
					trip_id => $train_id,
					service => $entry->{backend_name}
				)->then(
					sub {
						my ($journey) = @_;

						my $found_dep;
						my $found_arr;
						for my $stop ( $journey->route ) {
							if ( $stop->id_num == $dep ) {
								$found_dep = $stop;
							}
							if ( $arr and $stop->id_num == $arr ) {
								$found_arr = $stop;
								last;
							}
						}
						if ( not $found_dep ) {
							$self->app->log->debug(
								"Did not find $dep within journey $train_id");
							return;
						}

						if ( $found_dep->rt_dep ) {
							$self->app->in_transit->update_departure_efa(
								uid     => $uid,
								journey => $journey,
								stop    => $found_dep,
								dep_eva => $dep,
								arr_eva => $arr,
								trip_id => $train_id,
							);
						}

						if ( $found_arr and $found_arr->rt_arr ) {
							$self->app->in_transit->update_arrival_efa(
								uid     => $uid,
								journey => $journey,
								stop    => $found_arr,
								dep_eva => $dep,
								arr_eva => $arr,
								trip_id => $train_id,
							);
						}
					}
				)->catch(
					sub {
						my ($err) = @_;
						$backend_issues += 1;
						$self->app->log->error(
"work($uid) @ EFA $entry->{backend_name}: journey: $err"
						);
					}
				)->wait;

				if (    $arr
					and $entry->{real_arr_ts}
					and $now->epoch - $entry->{real_arr_ts} > 600 )
				{
					$self->app->checkout_p(
						station => $arr,
						force   => 2,
						dep_eva => $dep,
						arr_eva => $arr,
						uid     => $uid
					)->wait;
				}
			};
			if ($@) {
				$errors += 1;
				$self->app->log->error(
					"work($uid) @ EFA $entry->{backend_name}: $@");
			}
			next;
		}

		if ( $entry->{is_motis} ) {

			eval {
+101 −1
Original line number Diff line number Diff line
@@ -176,7 +176,7 @@ sub add {
				train_type         => $journey->type // q{},
				train_line         => $journey->line,
				train_no           => $journey->number // q{},
				train_id           => $journey->id,
				train_id           => $opt{trip_id},
				sched_departure    => $stop->sched_dep,
				real_departure     => $stop->rt_dep // $stop->sched_dep,
				route              => $json->encode( \@route ),
@@ -995,6 +995,43 @@ sub update_departure_dbris {
	);
}

sub update_departure_efa {
	my ( $self, %opt ) = @_;
	my $uid     = $opt{uid};
	my $db      = $opt{db} // $self->{pg}->db;
	my $dep_eva = $opt{dep_eva};
	my $arr_eva = $opt{arr_eva};
	my $journey = $opt{journey};
	my $stop    = $opt{stop};
	my $json    = JSON->new;

	my $res_h = $db->select( 'in_transit', ['data'], { user_id => $uid } )
	  ->expand->hash;
	my $ephemeral_data = $res_h ? $res_h->{data} : {};
	if ( $stop->rt_dep ) {
		$ephemeral_data->{rt} = 1;
	}

	say "UPDATE dep WHERE $uid $opt{trip_id} $dep_eva $arr_eva";

	# selecting on user_id and train_no avoids a race condition if a user checks
	# into a new train while we are fetching data for their previous journey. In
	# this case, the new train would receive data from the previous journey.
	$db->update(
		'in_transit',
		{
			data           => $json->encode($ephemeral_data),
			real_departure => $stop->rt_dep,
		},
		{
			user_id             => $uid,
			train_id            => $opt{trip_id},
			checkin_station_id  => $dep_eva,
			checkout_station_id => $arr_eva,
		}
	);
}

sub update_departure_motis {
	my ( $self, %opt ) = @_;
	my $uid      = $opt{uid};
@@ -1188,6 +1225,69 @@ sub update_arrival_dbris {
	);
}

sub update_arrival_efa {
	my ( $self, %opt ) = @_;
	my $uid     = $opt{uid};
	my $db      = $opt{db} // $self->{pg}->db;
	my $dep_eva = $opt{dep_eva};
	my $arr_eva = $opt{arr_eva};
	my $journey = $opt{journey};
	my $stop    = $opt{stop};
	my $json    = JSON->new;

	my $res_h
	  = $db->select( 'in_transit', [ 'data', 'route' ], { user_id => $uid } )
	  ->expand->hash;
	my $ephemeral_data = $res_h ? $res_h->{data}  : {};
	my $old_route      = $res_h ? $res_h->{route} : [];

	if ( $stop->rt_arr ) {
		$ephemeral_data->{rt} = 1;
	}

	my @route;
	for my $j_stop ( $journey->route ) {
		push(
			@route,
			[
				$j_stop->full_name,
				$j_stop->id_num,
				{
					sched_arr => _epoch( $j_stop->sched_arr ),
					sched_dep => _epoch( $j_stop->sched_dep ),
					rt_arr    => _epoch( $j_stop->rt_arr ),
					rt_dep    => _epoch( $j_stop->rt_dep ),
					arr_delay => $j_stop->arr_delay,
					dep_delay => $j_stop->dep_delay,
					efa_load  => $j_stop->occupancy,
					lat       => $j_stop->latlon->[0],
					lon       => $j_stop->latlon->[1],
				}
			]
		);
	}

	say "UPDATE arr WHERE $uid $opt{trip_id} $dep_eva $arr_eva";

	# selecting on user_id and train_no avoids a race condition if a user checks
	# into a new train while we are fetching data for their previous journey. In
	# this case, the new train would receive data from the previous journey.
	$db->update(
		'in_transit',
		{
			data         => $json->encode($ephemeral_data),
			real_arrival => $stop->rt_arr,
			route        => $json->encode( [@route] ),
		},
		{
			user_id             => $uid,
			train_id            => $opt{trip_id},
			checkin_station_id  => $dep_eva,
			checkout_station_id => $arr_eva,
		}
	);
}

sub update_arrival_motis {
	my ( $self, %opt ) = @_;
	my $uid      = $opt{uid};